Add collection management and upsert functionality to pgvecter API

- Enhanced `main.go` with new types and handlers for managing collections and upserting documents, including validation and error handling.
- Updated `openapi.yaml` to define new endpoints for listing collections and admin access.
- Introduced `CollectionsResponse` and `CollectionItem` schemas for API responses.
- Added a new migration script to include a `collection` column in the `kb_doc_chunks` table.
- Expanded `README.md` with usage examples for the new collection features and environment variable configurations.
This commit is contained in:
president
2026-02-06 17:44:23 +09:00
parent a8bff893dc
commit 277d152f9c
6 changed files with 1183 additions and 28 deletions

358
README.md
View File

@@ -9,9 +9,365 @@ mcp サーバーのブリッチ
- `ADMIN_API_KEY`: 管理者APIの認証キー
- `DATABASE_URL`: PostgreSQL接続文字列(未指定の場合はメモリ保存)
- `PORT`: HTTPポート(省略時 `8080`)
- `EMBEDDING_PROVIDER`: `llamacpp` or `openai` (省略時 `llamacpp`)
- `LLAMA_CPP_URL`: llama.cpp embed API の URL (例 `http://127.0.0.1:8092`)
- `EMBEDDING_DIM`: 埋め込み次元 (省略時 `1024`)
- `OPENAI_API_KEY`: OpenAI embeddings を使う場合のみ
- `EMBEDDING_MODEL`: OpenAI embeddings モデル名 (省略時 `text-embedding-3-small`)
### 起動
```bash
ADMIN_API_KEY=your-admin-key DATABASE_URL=postgres://... go run .
ADMIN_API_KEY=your-admin-key \
DATABASE_URL=postgres://... \
EMBEDDING_PROVIDER=llamacpp \
LLAMA_CPP_URL=http://127.0.0.1:8092 \
EMBEDDING_DIM=1024 \
go run .
```
### llama.cpp (embedding) 起動例 (Mac)
```bash
/Users/sunamurahideyuki/LLM/llama.cpp/build/bin/llama-server \
-m /Users/sunamurahideyuki/LLM/llama.cpp/models/bge-m3-Q4_K_M.gguf \
--embeddings -t 6 -c 2048 --host 127.0.0.1 --port 8092
```
### 管理UI
```text
http://localhost:8080/admin?admin_key=your-admin-key
```
### 基本操作
#### APIキー発行
```bash
curl -s -X POST http://localhost:8080/admin/api-keys \
-H "X-ADMIN-API-KEY: your-admin-key" \
-H "Content-Type: application/json" \
-d '{"label":"dev","permissions_users":["user_123"]}'
```
#### upsert (embedding 自動生成 / auto_chunk)
```bash
python3 - <<'PY' | \
curl -s -X POST http://localhost:8080/kb/upsert \
-H "X-API-KEY: pgv_..." \
-H "Content-Type: application/json" \
-d @-
import json, uuid
long_text = "\n\n".join(["段落テキスト " * 200 for _ in range(6)])
print(json.dumps({
"id": str(uuid.uuid4()),
"content": long_text,
"collection": "dev-diary",
"auto_chunk": True,
"metadata": {"permissions":{"users":["tampered_user"]}}
}))
PY
```
#### search (query_embedding 自動生成)
```bash
python3 - <<'PY' | \
curl -s -X POST http://localhost:8080/kb/search \
-H "X-API-KEY: pgv_..." \
-H "Content-Type: application/json" \
-d @-
import json
print(json.dumps({
"query": "hello world",
"collection": "dev-diary",
"top_k": 5
}))
PY
```
#### collections 一覧
```bash
curl -s http://localhost:8080/collections \
-H "X-API-KEY: pgv_..."
```
## VPS運用メモ
### llama-embed.service (VPS)
- サービスが有効であることを確認
```bash
sudo systemctl status llama-embed.service
```
- ヘルスチェック
```bash
curl -s http://127.0.0.1:8092/health
```
- 埋め込みチェック (1024次元)
```bash
curl -s http://127.0.0.1:8092/embedding \
-H 'Content-Type: application/json' \
-d '{"content":"次元チェック"}' | jq '.[0].embedding[0] | length'
```
### DBセットアップ (VPS)
```bash
sudo -u postgres psql
```
```sql
-- 新規DB/ユーザー
CREATE USER pgvecter_api WITH PASSWORD 'YOUR_STRONG_PASSWORD';
CREATE DATABASE pgvecter_api OWNER pgvecter_api;
\c pgvecter_api
CREATE EXTENSION IF NOT EXISTS vector;
CREATE EXTENSION IF NOT EXISTS pgcrypto;
-- テーブル
CREATE TABLE IF NOT EXISTS kb_doc_chunks (
id uuid PRIMARY KEY,
doc_id uuid NOT NULL,
chunk_index integer NOT NULL,
collection text NOT NULL DEFAULT 'default',
content text NOT NULL,
metadata jsonb NOT NULL DEFAULT '{}',
embedding vector(1024) NOT NULL,
updated_at timestamptz NOT NULL DEFAULT now()
);
CREATE INDEX IF NOT EXISTS kb_doc_chunks_embedding_hnsw
ON kb_doc_chunks USING hnsw (embedding vector_cosine_ops);
CREATE INDEX IF NOT EXISTS kb_doc_chunks_collection_idx
ON kb_doc_chunks (collection);
-- 権限
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO pgvecter_api;
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO pgvecter_api;
```
既存DBにコレクションを追加する場合:
```sql
ALTER TABLE kb_doc_chunks
ADD COLUMN IF NOT EXISTS collection text NOT NULL DEFAULT 'default';
CREATE INDEX IF NOT EXISTS kb_doc_chunks_collection_idx
ON kb_doc_chunks (collection);
```
### APIキー運用フロー
1. 管理者が `/admin` から APIキーを発行
2. 発行時に `permissions.users` を設定
3. 発行後のキーは **一度しか表示されない**ので安全に保管
4. 失効が必要になったら管理UIで即時失効
### systemd (pgvecter API) 例
```ini
[Unit]
Description=pgvecter API
After=network.target
[Service]
Type=simple
WorkingDirectory=/opt/pgvecterAPI
Environment=ADMIN_API_KEY=your-admin-key
Environment=DATABASE_URL=postgres://pgvecter_api:YOUR_STRONG_PASSWORD@127.0.0.1:5432/pgvecter_api?sslmode=disable
Environment=EMBEDDING_PROVIDER=llamacpp
Environment=LLAMA_CPP_URL=http://127.0.0.1:8092
Environment=EMBEDDING_DIM=1024
Environment=PORT=8080
ExecStart=/usr/local/bin/go run .
Restart=always
RestartSec=3
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
```
### nginx リバプロ例
```nginx
server {
listen 80;
server_name api.example.com;
location / {
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
```
### HTTPS (Lets Encrypt) 設定例
```bash
sudo apt update
sudo apt install -y certbot python3-certbot-nginx
sudo certbot --nginx -d api.example.com
```
```nginx
server {
listen 443 ssl;
server_name api.example.com;
ssl_certificate /etc/letsencrypt/live/api.example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/api.example.com/privkey.pem;
location / {
proxy_pass http://127.0.0.1:8080;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
```
### systemd (バイナリ実行) 例
```ini
[Unit]
Description=pgvecter API
After=network.target
[Service]
Type=simple
WorkingDirectory=/opt/pgvecterAPI
EnvironmentFile=/opt/pgvecterAPI/.env
ExecStart=/opt/pgvecterAPI/pgvecter-api
Restart=always
RestartSec=3
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
```
### メモリ/CPU制限のベストプラクティス
- systemd のリソース制限
```ini
[Service]
MemoryMax=512M
CPUQuota=200%
TasksMax=200
```
- nginx のタイムアウト設定例
```nginx
proxy_connect_timeout 5s;
proxy_read_timeout 30s;
proxy_send_timeout 30s;
```
## デプロイ手順 (VPS)
1. リポジトリ配置
```bash
sudo mkdir -p /opt/pgvecterAPI
sudo chown $USER:$USER /opt/pgvecterAPI
git clone <repo-url> /opt/pgvecterAPI
cd /opt/pgvecterAPI
```
2.ビルド (Go)
```bash
go build -o pgvecter-api .
```
3.`.env` 設定
```bash
cat > /opt/pgvecterAPI/.env <<'ENV'
ADMIN_API_KEY=your-admin-key
DATABASE_URL=postgres://pgvecter_api:YOUR_STRONG_PASSWORD@127.0.0.1:5432/pgvecter_api?sslmode=disable
EMBEDDING_PROVIDER=llamacpp
LLAMA_CPP_URL=http://127.0.0.1:8092
EMBEDDING_DIM=1024
PORT=8080
ENV
```
4.systemd 反映
```bash
sudo cp /opt/pgvecterAPI/pgvecter-api.service /etc/systemd/system/pgvecter-api.service
sudo systemctl daemon-reload
sudo systemctl enable --now pgvecter-api.service
```
5.nginx 反映
```bash
sudo ln -s /etc/nginx/sites-available/pgvecter-api.conf /etc/nginx/sites-enabled/pgvecter-api.conf
sudo nginx -t && sudo systemctl reload nginx
```
## 運用時のチェックリスト
- [ ] `systemctl status pgvecter-api.service``active (running)`
- [ ] `curl -s http://127.0.0.1:8080/health``ok`
- [ ] `curl -s http://127.0.0.1:8092/health``ok`
- [ ] DB接続が正常 (`SELECT 1;` が通る)
- [ ] 管理UIでAPIキー発行/失効ができる
- [ ] `/kb/upsert` が成功する
- [ ] `/kb/search` で結果が返る
### ログ/監視の最低限
- systemd/journal でログ確認
```bash
journalctl -u pgvecter-api.service -f
```
- ヘルスチェック
```bash
curl -s http://127.0.0.1:8080/health
```
- 埋め込み API ヘルス
```bash
curl -s http://127.0.0.1:8092/health
```
## 他サービスからの利用
チャットbot等の別リポジトリから参照する場合は、以下の2点を明確にする。
1. `X-API-KEY` を必ず送る
2. `permissions.users` はサーバ側で強制注入される前提
### 例: /kb/search
```bash
curl -s -X POST http://localhost:8080/kb/search \
-H "X-API-KEY: pgv_..." \
-H "Content-Type: application/json" \
-d '{"query":"hello world","top_k":5}'
```

View File

@@ -17,6 +17,7 @@
- デプロイ先: 未確定
- 自社PostgreSQL設置: `labo.sunamura-llc.com`
- 想定組織: 単一組織(将来的に社員利用を想定)
- 埋め込み基盤: llama.cpp (BGE-M3, 1024次元)
## 3. 想定ユースケース
@@ -163,7 +164,7 @@
## 5.2 追加候補テーブル
- `kb_doc_chunks` (チャンク分割する場合)
- `id`, `doc_id`, `content`, `metadata`, `embedding`, `chunk_index`, `updated_at`
- `id`, `doc_id`, `collection`, `content`, `metadata`, `embedding`, `chunk_index`, `updated_at`
- `kb_sources` (原本参照や権限連携が必要な場合)
- `id`, `source_type`, `source_ref`, `metadata`, `updated_at`
- `kb_ingest_jobs` (取り込み/同期の運用が必要な場合)
@@ -174,11 +175,19 @@
- チャンク化: 必須
- 権限: `metadata` に格納
- 業務基幹: 同期まで行う
- 埋め込み次元: 1536
- 埋め込み次元: 1024
- 埋め込みモデル変更: 将来差し替えあり(モデル名と次元を記録)
- 権限キー: `permissions.users` のみ(当面)
- 同期方式: 定期 pull + 差分
- チャンク仕様: 600 tokens、overlap 80 tokens、段落優先
- チャンク仕様: 800 文字、overlap 100 文字、段落優先
## 6.2 埋め込み基盤メモ
- labo: `llama-embed.service` (http://127.0.0.1:8092)
- `/health` -> `{"status":"ok"}`
- `/embedding` -> `{"content":"..."}` で 1024次元
- モデル: `bge-m3-Q4_K_M.gguf`
- ローカル(Mac)にも `llama.cpp` 環境あり
## 5.4 DDL候補(ドラフト)
@@ -200,9 +209,10 @@ CREATE TABLE kb_doc_chunks (
id uuid PRIMARY KEY,
doc_id uuid NOT NULL REFERENCES kb_docs(id) ON DELETE CASCADE,
chunk_index integer NOT NULL,
collection text NOT NULL DEFAULT 'default',
content text NOT NULL,
metadata jsonb NOT NULL DEFAULT '{}',
embedding vector(1536) NOT NULL,
embedding vector(1024) NOT NULL,
embedding_model text NOT NULL,
embedding_dim int NOT NULL,
updated_at timestamptz NOT NULL DEFAULT now()

712
main.go
View File

@@ -10,9 +10,11 @@ import (
"errors"
"fmt"
"log"
"math"
"net/http"
"os"
"sort"
"strconv"
"strings"
"sync"
"time"
@@ -44,6 +46,7 @@ type KbSearchRequest struct {
QueryEmbedding []float64 `json:"query_embedding"`
TopK int `json:"top_k"`
Filter map[string]interface{} `json:"filter"`
Collection string `json:"collection"`
}
type KbSearchResponse struct {
@@ -51,10 +54,43 @@ type KbSearchResponse struct {
}
type KbSearchResult struct {
ID string `json:"id"`
Content string `json:"content"`
Metadata map[string]interface{} `json:"metadata"`
Score float64 `json:"score"`
ID string `json:"id"`
Collection string `json:"collection"`
Content string `json:"content"`
Metadata map[string]interface{} `json:"metadata"`
Score float64 `json:"score"`
}
type CollectionItem struct {
Name string `json:"name"`
Count int `json:"count"`
}
type CollectionsResponse struct {
Items []CollectionItem `json:"items"`
}
type KbUpsertRequest struct {
ID string `json:"id"`
DocID string `json:"doc_id"`
ChunkIndex int `json:"chunk_index"`
Collection string `json:"collection"`
Content string `json:"content"`
Metadata map[string]interface{} `json:"metadata"`
Embedding []float64 `json:"embedding"`
AutoChunk bool `json:"auto_chunk"`
ChunkSize int `json:"chunk_size"`
Overlap int `json:"chunk_overlap"`
}
type KbUpsertResponse struct {
ID string `json:"id"`
DocID string `json:"doc_id"`
Updated bool `json:"updated"`
Chunks []struct {
ID string `json:"id"`
ChunkIndex int `json:"chunk_index"`
} `json:"chunks,omitempty"`
}
type AdminApiKey struct {
@@ -364,6 +400,23 @@ func generateID() string {
hex.EncodeToString(buf[10:16])
}
func parseUUID(value string) (string, error) {
parts := strings.Split(value, "-")
if len(parts) != 5 {
return "", errors.New("invalid uuid format")
}
lengths := []int{8, 4, 4, 4, 12}
for i, part := range parts {
if len(part) != lengths[i] {
return "", errors.New("invalid uuid format")
}
if _, err := hex.DecodeString(part); err != nil {
return "", errors.New("invalid uuid format")
}
}
return value, nil
}
func hashKey(raw string) string {
sum := sha256.Sum256([]byte(raw))
return hex.EncodeToString(sum[:])
@@ -416,6 +469,164 @@ func handleNotImplemented(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusNotImplemented, "not_implemented", "endpoint not implemented yet")
}
func handleKbUpsert(db *pgxpool.Pool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if db == nil {
writeError(w, http.StatusInternalServerError, "db_not_configured", "DATABASE_URL not configured")
return
}
permissions, ok := r.Context().Value(ctxPermissionsUsers).([]string)
if !ok || len(permissions) == 0 {
writeError(w, http.StatusForbidden, "forbidden", "permissions.users not set")
return
}
var req KbUpsertRequest
if err := decodeJSON(r, &req); err != nil {
writeError(w, http.StatusBadRequest, "invalid_request", err.Error())
return
}
if strings.TrimSpace(req.Content) == "" {
writeError(w, http.StatusBadRequest, "invalid_request", "content is required")
return
}
if strings.TrimSpace(req.Collection) == "" {
req.Collection = "default"
}
if req.ChunkSize == 0 {
req.ChunkSize = 800
}
if req.Overlap == 0 {
req.Overlap = 100
}
if req.ChunkSize < 200 {
writeError(w, http.StatusBadRequest, "invalid_request", "chunk_size is too small")
return
}
if req.Overlap < 0 || req.Overlap >= req.ChunkSize {
writeError(w, http.StatusBadRequest, "invalid_request", "chunk_overlap must be between 0 and chunk_size-1")
return
}
dim := embeddingDim()
if len(req.Embedding) > 0 && req.AutoChunk {
writeError(w, http.StatusBadRequest, "invalid_request", "embedding cannot be provided when auto_chunk is true")
return
}
if strings.TrimSpace(req.DocID) == "" {
req.DocID = generateID()
} else {
if _, err := parseUUID(req.DocID); err != nil {
writeError(w, http.StatusBadRequest, "invalid_request", "doc_id must be uuid")
return
}
}
if req.ChunkIndex < 0 {
writeError(w, http.StatusBadRequest, "invalid_request", "chunk_index must be >= 0")
return
}
if !req.AutoChunk {
if strings.TrimSpace(req.ID) == "" {
writeError(w, http.StatusBadRequest, "invalid_request", "id is required")
return
}
if _, err := parseUUID(req.ID); err != nil {
writeError(w, http.StatusBadRequest, "invalid_request", "id must be uuid")
return
}
}
metadata := req.Metadata
if metadata == nil {
metadata = map[string]interface{}{}
}
metadata["collection"] = req.Collection
perms := map[string]interface{}{
"users": permissions,
}
metadata["permissions"] = perms
metaBytes, err := json.Marshal(metadata)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid_request", "metadata must be valid object")
return
}
if req.AutoChunk || len([]rune(req.Content)) > req.ChunkSize {
chunks := splitIntoChunks(req.Content, req.ChunkSize, req.Overlap)
resp := KbUpsertResponse{DocID: req.DocID, Updated: false}
for i, chunk := range chunks {
emb, err := createEmbeddingWithDim(r.Context(), chunk, dim)
if err != nil {
writeError(w, http.StatusBadRequest, "embedding_failed", err.Error())
return
}
vectorParam := vectorLiteral(emb)
id := generateID()
var updated bool
err = db.QueryRow(r.Context(), `
INSERT INTO kb_doc_chunks (id, doc_id, chunk_index, collection, content, metadata, embedding, updated_at)
VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7::vector, now())
ON CONFLICT (id) DO UPDATE
SET doc_id = EXCLUDED.doc_id,
chunk_index = EXCLUDED.chunk_index,
collection = EXCLUDED.collection,
content = EXCLUDED.content,
metadata = EXCLUDED.metadata,
embedding = EXCLUDED.embedding,
updated_at = now()
RETURNING (xmax <> 0) AS updated
`, id, req.DocID, i, req.Collection, chunk, string(metaBytes), vectorParam).Scan(&updated)
if err != nil {
log.Printf("kb.upsert failed: %v", err)
writeError(w, http.StatusInternalServerError, "upsert_failed", fmt.Sprintf("failed to upsert document: %v", err))
return
}
resp.Updated = resp.Updated || updated
resp.Chunks = append(resp.Chunks, struct {
ID string `json:"id"`
ChunkIndex int `json:"chunk_index"`
}{ID: id, ChunkIndex: i})
}
if len(resp.Chunks) > 0 {
resp.ID = resp.Chunks[0].ID
}
writeJSON(w, http.StatusOK, resp)
return
}
if len(req.Embedding) == 0 {
emb, err := createEmbeddingWithDim(r.Context(), req.Content, dim)
if err != nil {
writeError(w, http.StatusBadRequest, "embedding_failed", err.Error())
return
}
req.Embedding = emb
}
if len(req.Embedding) != dim {
writeError(w, http.StatusBadRequest, "invalid_request", fmt.Sprintf("embedding must be length %d", dim))
return
}
vectorParam := vectorLiteral(req.Embedding)
var updated bool
err = db.QueryRow(r.Context(), `
INSERT INTO kb_doc_chunks (id, doc_id, chunk_index, collection, content, metadata, embedding, updated_at)
VALUES ($1, $2, $3, $4, $5, $6::jsonb, $7::vector, now())
ON CONFLICT (id) DO UPDATE
SET doc_id = EXCLUDED.doc_id,
chunk_index = EXCLUDED.chunk_index,
collection = EXCLUDED.collection,
content = EXCLUDED.content,
metadata = EXCLUDED.metadata,
embedding = EXCLUDED.embedding,
updated_at = now()
RETURNING (xmax <> 0) AS updated
`, req.ID, req.DocID, req.ChunkIndex, req.Collection, req.Content, string(metaBytes), vectorParam).Scan(&updated)
if err != nil {
log.Printf("kb.upsert failed: %v", err)
writeError(w, http.StatusInternalServerError, "upsert_failed", fmt.Sprintf("failed to upsert document: %v", err))
return
}
writeJSON(w, http.StatusOK, KbUpsertResponse{ID: req.ID, DocID: req.DocID, Updated: updated})
}
}
func handleKbSearch(db *pgxpool.Pool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if db == nil {
@@ -439,12 +650,20 @@ func handleKbSearch(db *pgxpool.Pool) http.HandlerFunc {
writeError(w, http.StatusBadRequest, "invalid_request", "top_k must be between 1 and 50")
return
}
if len(req.QueryEmbedding) == 0 {
writeError(w, http.StatusBadRequest, "invalid_request", "query_embedding is required")
return
if strings.TrimSpace(req.Collection) == "" {
req.Collection = "default"
}
if len(req.QueryEmbedding) != 1536 {
writeError(w, http.StatusBadRequest, "invalid_request", "query_embedding must be length 1536")
dim := embeddingDim()
if len(req.QueryEmbedding) == 0 {
emb, err := createEmbeddingWithDim(r.Context(), req.Query, dim)
if err != nil {
writeError(w, http.StatusBadRequest, "embedding_failed", err.Error())
return
}
req.QueryEmbedding = emb
}
if len(req.QueryEmbedding) != dim {
writeError(w, http.StatusBadRequest, "invalid_request", fmt.Sprintf("query_embedding must be length %d", dim))
return
}
@@ -466,21 +685,21 @@ func handleKbSearch(db *pgxpool.Pool) http.HandlerFunc {
}
}
whereSQL, params, nextIdx, err := buildFilterSQL(mergedFilter, 2)
whereSQL, params, nextIdx, err := buildFilterSQL(mergedFilter, 3)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid_request", err.Error())
return
}
vectorParam := vectorLiteral(req.QueryEmbedding)
params = append([]interface{}{vectorParam}, params...)
params = append([]interface{}{vectorParam, req.Collection}, params...)
params = append(params, req.TopK)
limitIdx := nextIdx
query := fmt.Sprintf(`
SELECT id::text, content, metadata, (embedding <=> $1) AS score
SELECT id::text, collection, content, metadata, (embedding <=> $1) AS score
FROM kb_doc_chunks
WHERE %s
WHERE collection = $2 AND %s
ORDER BY embedding <=> $1
LIMIT $%d
`, whereSQL, limitIdx)
@@ -497,11 +716,16 @@ LIMIT $%d
for rows.Next() {
var res KbSearchResult
var metaBytes []byte
if err := rows.Scan(&res.ID, &res.Content, &metaBytes, &res.Score); err != nil {
if err := rows.Scan(&res.ID, &res.Collection, &res.Content, &metaBytes, &res.Score); err != nil {
log.Printf("kb.search scan failed: %v", err)
writeError(w, http.StatusInternalServerError, "search_failed", fmt.Sprintf("failed to read search results: %v", err))
return
}
if math.IsNaN(res.Score) || math.IsInf(res.Score, 0) {
res.Score = 0
} else {
res.Score = distanceToSimilarity(res.Score)
}
if len(metaBytes) > 0 {
if err := json.Unmarshal(metaBytes, &res.Metadata); err != nil {
log.Printf("kb.search metadata unmarshal failed: %v", err)
@@ -521,6 +745,99 @@ LIMIT $%d
}
}
func handleCollections(db *pgxpool.Pool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if db == nil {
writeError(w, http.StatusInternalServerError, "db_not_configured", "DATABASE_URL not configured")
return
}
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
permissions, ok := r.Context().Value(ctxPermissionsUsers).([]string)
if !ok || len(permissions) == 0 {
writeError(w, http.StatusForbidden, "forbidden", "permissions.users not set")
return
}
permFilter := map[string]interface{}{
"contains": map[string]interface{}{
"metadata.permissions.users": permissions,
},
}
whereSQL, params, _, err := buildFilterSQL(permFilter, 1)
if err != nil {
writeError(w, http.StatusBadRequest, "invalid_request", err.Error())
return
}
query := fmt.Sprintf(`
SELECT collection, COUNT(*) AS cnt
FROM kb_doc_chunks
WHERE %s
GROUP BY collection
ORDER BY collection ASC
`, whereSQL)
rows, err := db.Query(r.Context(), query, params...)
if err != nil {
writeError(w, http.StatusInternalServerError, "collections_failed", fmt.Sprintf("failed to list collections: %v", err))
return
}
defer rows.Close()
items := make([]CollectionItem, 0)
for rows.Next() {
var item CollectionItem
if err := rows.Scan(&item.Name, &item.Count); err != nil {
writeError(w, http.StatusInternalServerError, "collections_failed", "failed to read collections")
return
}
items = append(items, item)
}
if err := rows.Err(); err != nil {
writeError(w, http.StatusInternalServerError, "collections_failed", "failed to read collections")
return
}
writeJSON(w, http.StatusOK, CollectionsResponse{Items: items})
}
}
func handleAdminCollections(db *pgxpool.Pool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if db == nil {
writeError(w, http.StatusInternalServerError, "db_not_configured", "DATABASE_URL not configured")
return
}
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
rows, err := db.Query(r.Context(), `
SELECT collection, COUNT(*) AS cnt
FROM kb_doc_chunks
GROUP BY collection
ORDER BY collection ASC
`)
if err != nil {
writeError(w, http.StatusInternalServerError, "collections_failed", fmt.Sprintf("failed to list collections: %v", err))
return
}
defer rows.Close()
items := make([]CollectionItem, 0)
for rows.Next() {
var item CollectionItem
if err := rows.Scan(&item.Name, &item.Count); err != nil {
writeError(w, http.StatusInternalServerError, "collections_failed", "failed to read collections")
return
}
items = append(items, item)
}
if err := rows.Err(); err != nil {
writeError(w, http.StatusInternalServerError, "collections_failed", "failed to read collections")
return
}
writeJSON(w, http.StatusOK, CollectionsResponse{Items: items})
}
}
func handleAdminApiKeys(store APIKeyStore) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
path := strings.TrimPrefix(r.URL.Path, "/admin/api-keys")
@@ -657,8 +974,9 @@ const adminHTML = `<!doctype html>
<body>
<h1>pgvecter API Admin</h1>
<p class="muted">X-ADMIN-API-KEY を使って管理APIを呼び出します。</p>
<div id="status" class="muted"></div>
<section>
<section id="editSection">
<h2>接続</h2>
<div class="row">
<div>
@@ -687,6 +1005,7 @@ const adminHTML = `<!doctype html>
</div>
<button onclick="createKey()">発行</button>
<pre id="issuedKey"></pre>
<button onclick="copyIssuedKey()">コピー</button>
</section>
<section>
@@ -707,9 +1026,46 @@ const adminHTML = `<!doctype html>
</table>
</section>
<section>
<h2>コレクション一覧</h2>
<table id="collectionsTable">
<thead>
<tr>
<th>Name</th>
<th>Count</th>
</tr>
</thead>
<tbody></tbody>
</table>
</section>
<section>
<h2>編集</h2>
<div class="row">
<div>
<label>Key ID</label><br />
<input id="editId" style="width:100%" />
</div>
<div>
<label>Label</label><br />
<input id="editLabel" style="width:100%" />
</div>
<div>
<label>permissions.users改行区切り</label><br />
<textarea id="editUsers"></textarea>
</div>
</div>
<button onclick="updateKey()">更新</button>
</section>
<script>
function getBaseUrl() { return document.getElementById('baseUrl').value.trim(); }
function getAdminKey() { return document.getElementById('adminKey').value.trim(); }
function getAdminKey() {
const input = document.getElementById('adminKey').value.trim();
if (input) { return input; }
const params = new URLSearchParams(window.location.search);
return params.get('admin_key') || '';
}
function headers() {
return { 'Content-Type': 'application/json', 'X-ADMIN-API-KEY': getAdminKey() };
}
@@ -718,7 +1074,7 @@ const adminHTML = `<!doctype html>
}
async function loadKeys() {
const res = await fetch(getBaseUrl() + '/admin/api-keys', { headers: headers() });
if (!res.ok) { alert('一覧取得に失敗しました'); return; }
if (!res.ok) { setStatus('一覧取得に失敗しました'); return; }
const data = await res.json();
const tbody = document.querySelector('#keysTable tbody');
tbody.innerHTML = '';
@@ -731,8 +1087,23 @@ const adminHTML = `<!doctype html>
'<td>' + item.status + '</td>' +
'<td>' + item.created_at + '</td>' +
'<td>' + (item.last_used_at || '') + '</td>' +
'<td><button>失効</button></td>';
tr.querySelector('button').addEventListener('click', () => revokeKey(item.id));
'<td><button class="revoke">失効</button> <button class="edit">編集</button></td>';
tr.querySelector('button.revoke').addEventListener('click', () => revokeKey(item.id));
tr.querySelector('button.edit').addEventListener('click', () => fillEditForm(item));
tbody.appendChild(tr);
}
}
async function loadCollections() {
const res = await fetch(getBaseUrl() + '/admin/collections', { headers: headers() });
if (!res.ok) { setStatus('コレクション取得に失敗しました', 'error'); return; }
const data = await res.json();
const tbody = document.querySelector('#collectionsTable tbody');
tbody.innerHTML = '';
for (const item of data.items || []) {
const tr = document.createElement('tr');
tr.innerHTML =
'<td>' + item.name + '</td>' +
'<td>' + item.count + '</td>';
tbody.appendChild(tr);
}
}
@@ -744,19 +1115,69 @@ const adminHTML = `<!doctype html>
headers: headers(),
body: JSON.stringify({ label: label, permissions_users: users })
});
if (!res.ok) { alert('発行に失敗しました'); return; }
if (!res.ok) { setStatus('発行に失敗しました'); return; }
const data = await res.json();
document.getElementById('issuedKey').textContent = 'API Key: ' + data.api_key;
setStatus('発行しました', 'ok');
await loadKeys();
await loadCollections();
}
async function revokeKey(id) {
const res = await fetch(getBaseUrl() + '/admin/api-keys/' + id + '/revoke', {
method: 'POST',
headers: headers()
});
if (!res.ok) { alert('失効に失敗しました'); return; }
if (!res.ok) { setStatus('失効に失敗しました', 'error'); return; }
setStatus('失効しました', 'ok');
await loadKeys();
await loadCollections();
}
function fillEditForm(item) {
document.getElementById('editId').value = item.id;
document.getElementById('editLabel').value = item.label;
document.getElementById('editUsers').value = (item.permissions_users || []).join('\n');
const section = document.getElementById('editSection');
if (section && section.scrollIntoView) {
section.scrollIntoView({ behavior: 'smooth', block: 'start' });
}
}
async function updateKey() {
const id = document.getElementById('editId').value.trim();
const label = document.getElementById('editLabel').value.trim();
const users = parseUsers(document.getElementById('editUsers').value);
if (!id) { setStatus('Key IDが必要です', 'error'); return; }
const res = await fetch(getBaseUrl() + '/admin/api-keys/' + id, {
method: 'PATCH',
headers: headers(),
body: JSON.stringify({ label: label, permissions_users: users })
});
if (!res.ok) { setStatus('更新に失敗しました', 'error'); return; }
setStatus('更新しました', 'ok');
await loadKeys();
await loadCollections();
}
function setStatus(msg, type) {
const el = document.getElementById('status');
if (!el) { return; }
el.textContent = msg;
el.style.color = type === 'error' ? '#b00020' : '#0b6e4f';
clearTimeout(window.__statusTimer);
window.__statusTimer = setTimeout(() => { el.textContent = ''; }, 3000);
}
async function copyIssuedKey() {
const text = document.getElementById('issuedKey').textContent.replace('API Key: ', '').trim();
if (!text) { setStatus('コピー対象がありません', 'error'); return; }
try {
await navigator.clipboard.writeText(text);
setStatus('APIキーをコピーしました', 'ok');
} catch (e) {
setStatus('コピーに失敗しました', 'error');
}
}
window.addEventListener('DOMContentLoaded', () => {
loadKeys();
loadCollections();
});
</script>
</body>
</html>`
@@ -953,6 +1374,159 @@ func vectorLiteral(vec []float64) string {
return "[" + strings.Join(parts, ",") + "]"
}
func distanceToSimilarity(distance float64) float64 {
// pgvector cosine distance is in [0, 2]; convert to similarity in [0, 1]
sim := 1 - (distance / 2.0)
if sim < 0 {
return 0
}
if sim > 1 {
return 1
}
return sim
}
type embeddingRequest struct {
Input string `json:"input"`
Model string `json:"model"`
EncodingFormat string `json:"encoding_format,omitempty"`
}
type embeddingResponse struct {
Data []struct {
Embedding []float64 `json:"embedding"`
} `json:"data"`
}
func createEmbedding(ctx context.Context, text string) ([]float64, error) {
return createEmbeddingWithDim(ctx, text, embeddingDim())
}
func createEmbeddingWithDim(ctx context.Context, text string, dim int) ([]float64, error) {
provider := strings.ToLower(strings.TrimSpace(os.Getenv("EMBEDDING_PROVIDER")))
llamaURL := strings.TrimSpace(os.Getenv("LLAMA_CPP_URL"))
if provider == "" && llamaURL != "" {
provider = "llamacpp"
}
if provider == "" {
provider = "openai"
}
switch provider {
case "llamacpp":
return createLlamaCppEmbedding(ctx, text, dim)
case "openai":
return createOpenAIEmbedding(ctx, text)
default:
return nil, fmt.Errorf("unsupported EMBEDDING_PROVIDER: %s", provider)
}
}
func createLlamaCppEmbedding(ctx context.Context, text string, dim int) ([]float64, error) {
baseURL := strings.TrimSpace(os.Getenv("LLAMA_CPP_URL"))
if baseURL == "" {
baseURL = "http://127.0.0.1:8092"
}
endpoint := strings.TrimRight(baseURL, "/") + "/embedding"
payload, err := json.Marshal(map[string]string{
"content": text,
})
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, strings.NewReader(string(payload)))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 15 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("llama.cpp embedding request failed with status %d", resp.StatusCode)
}
var out []struct {
Embedding [][]float64 `json:"embedding"`
}
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, err
}
if len(out) == 0 || len(out[0].Embedding) == 0 || len(out[0].Embedding[0]) == 0 {
return nil, errors.New("llama.cpp embedding response is empty")
}
emb := out[0].Embedding[0]
if len(emb) != dim {
return nil, fmt.Errorf("embedding dimension mismatch: got %d, expected %d", len(emb), dim)
}
return emb, nil
}
func createOpenAIEmbedding(ctx context.Context, text string) ([]float64, error) {
apiKey := strings.TrimSpace(os.Getenv("OPENAI_API_KEY"))
if apiKey == "" {
return nil, errors.New("OPENAI_API_KEY is not set")
}
model := strings.TrimSpace(os.Getenv("EMBEDDING_MODEL"))
if model == "" {
model = "text-embedding-3-small"
}
reqBody := embeddingRequest{
Input: text,
Model: model,
EncodingFormat: "float",
}
payload, err := json.Marshal(reqBody)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "https://api.openai.com/v1/embeddings", strings.NewReader(string(payload)))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+apiKey)
client := &http.Client{Timeout: 15 * time.Second}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("embedding request failed with status %d", resp.StatusCode)
}
var out embeddingResponse
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, err
}
if len(out.Data) == 0 || len(out.Data[0].Embedding) == 0 {
return nil, errors.New("embedding response is empty")
}
return out.Data[0].Embedding, nil
}
func embeddingDim() int {
raw := strings.TrimSpace(os.Getenv("EMBEDDING_DIM"))
if raw == "" {
return 1024
}
n, err := strconv.Atoi(raw)
if err != nil || n <= 0 {
return 1024
}
return n
}
func logConnectionInfo(db *pgxpool.Pool) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
@@ -965,6 +1539,98 @@ func logConnectionInfo(db *pgxpool.Pool) {
log.Printf("kb.search db user=%s db=%s", user, dbname)
}
func splitIntoChunks(text string, size int, overlap int) []string {
if size <= 0 {
return []string{text}
}
text = strings.ReplaceAll(text, "\r\n", "\n")
paras := splitParagraphs(text)
var chunks []string
var buf []rune
flush := func() {
if len(buf) == 0 {
return
}
chunks = append(chunks, string(buf))
if overlap > 0 && len(buf) > overlap {
buf = append([]rune{}, buf[len(buf)-overlap:]...)
} else {
buf = buf[:0]
}
}
for _, p := range paras {
rp := []rune(p)
if len(rp) > size {
flush()
sub := splitByRunes(p, size, overlap)
chunks = append(chunks, sub...)
buf = buf[:0]
continue
}
if len(buf)+len(rp) > size {
flush()
}
if len(buf) > 0 {
buf = append(buf, '\n', '\n')
}
buf = append(buf, rp...)
}
flush()
if len(chunks) == 0 {
return []string{""}
}
return chunks
}
func splitParagraphs(text string) []string {
lines := strings.Split(text, "\n")
var paras []string
var cur []string
for _, line := range lines {
if strings.TrimSpace(line) == "" {
if len(cur) > 0 {
paras = append(paras, strings.Join(cur, "\n"))
cur = cur[:0]
}
continue
}
cur = append(cur, line)
}
if len(cur) > 0 {
paras = append(paras, strings.Join(cur, "\n"))
}
return paras
}
func splitByRunes(text string, size int, overlap int) []string {
r := []rune(text)
if len(r) <= size {
return []string{text}
}
if overlap < 0 {
overlap = 0
}
if overlap >= size {
overlap = size / 4
}
var out []string
for i := 0; i < len(r); {
end := i + size
if end > len(r) {
end = len(r)
}
out = append(out, string(r[i:end]))
if end == len(r) {
break
}
i = end - overlap
if i < 0 {
i = 0
}
}
return out
}
func initStore() (APIKeyStore, *pgxpool.Pool, func(), error) {
dbURL := os.Getenv("DATABASE_URL")
if strings.TrimSpace(dbURL) == "" {
@@ -1007,12 +1673,14 @@ func main() {
mux.HandleFunc("/health", handleHealth)
mux.Handle("/db/query", withAPIKeyAuth(store, http.HandlerFunc(handleNotImplemented)))
mux.Handle("/kb/upsert", withAPIKeyAuth(store, http.HandlerFunc(handleNotImplemented)))
mux.Handle("/kb/upsert", withAPIKeyAuth(store, handleKbUpsert(db)))
mux.Handle("/kb/search", withAPIKeyAuth(store, handleKbSearch(db)))
mux.Handle("/collections", withAPIKeyAuth(store, handleCollections(db)))
adminHandler := withAdminAuth(adminKey, http.HandlerFunc(handleAdminApiKeys(store)))
mux.Handle("/admin/api-keys", adminHandler)
mux.Handle("/admin/api-keys/", adminHandler)
mux.Handle("/admin/collections", withAdminAuth(adminKey, handleAdminCollections(db)))
mux.Handle("/admin", withAdminUIAuth(adminKey, http.HandlerFunc(handleAdminUI)))
port := os.Getenv("PORT")

View File

@@ -0,0 +1,5 @@
ALTER TABLE kb_doc_chunks
ADD COLUMN IF NOT EXISTS collection text NOT NULL DEFAULT 'default';
CREATE INDEX IF NOT EXISTS kb_doc_chunks_collection_idx
ON kb_doc_chunks (collection);

View File

@@ -92,6 +92,42 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ErrorResponse"
/collections:
get:
summary: List collections
operationId: listCollections
responses:
"200":
description: Collections list
content:
application/json:
schema:
$ref: "#/components/schemas/CollectionsResponse"
"401":
description: Unauthorized
content:
application/json:
schema:
$ref: "#/components/schemas/ErrorResponse"
/admin/collections:
get:
summary: List collections (admin)
operationId: listCollectionsAdmin
security:
- AdminApiKeyAuth: []
responses:
"200":
description: Collections list
content:
application/json:
schema:
$ref: "#/components/schemas/CollectionsResponse"
"401":
description: Unauthorized
content:
application/json:
schema:
$ref: "#/components/schemas/ErrorResponse"
/admin/api-keys:
get:
summary: List API keys (admin)
@@ -261,11 +297,34 @@ components:
id:
type: string
description: Document ID
doc_id:
type: string
description: Parent document ID (defaults to id)
chunk_index:
type: integer
description: Chunk index (defaults to 0)
collection:
type: string
description: Collection name (default "default")
auto_chunk:
type: boolean
description: If true, server splits content into chunks and upserts multiple rows.
chunk_size:
type: integer
description: Chunk size in characters (default 800).
chunk_overlap:
type: integer
description: Overlap size in characters (default 100).
content:
type: string
metadata:
type: object
additionalProperties: true
embedding:
type: array
items:
type: number
description: Embedding vector (length 1024). If omitted, server generates via llama.cpp.
required:
- id
- content
@@ -274,10 +333,25 @@ components:
properties:
id:
type: string
doc_id:
type: string
updated:
type: boolean
chunks:
type: array
items:
type: object
properties:
id:
type: string
chunk_index:
type: integer
required:
- id
- chunk_index
required:
- id
- doc_id
- updated
KbSearchRequest:
type: object
@@ -288,12 +362,15 @@ components:
type: array
items:
type: number
description: Optional precomputed embedding vector. If omitted, server-side embedding is required.
description: Optional precomputed embedding vector. If omitted, server generates via llama.cpp.
top_k:
type: integer
default: 5
minimum: 1
maximum: 50
collection:
type: string
description: Collection name (default "default")
filter:
type: object
additionalProperties: true
@@ -316,6 +393,8 @@ components:
properties:
id:
type: string
collection:
type: string
content:
type: string
metadata:
@@ -324,10 +403,31 @@ components:
score:
type: number
format: float
description: Similarity score (0-1). Converted from cosine distance.
required:
- id
- collection
- content
- score
CollectionsResponse:
type: object
properties:
items:
type: array
items:
$ref: "#/components/schemas/CollectionItem"
required:
- items
CollectionItem:
type: object
properties:
name:
type: string
count:
type: integer
required:
- name
- count
AdminApiKey:
type: object
properties:

16
pgvecter-api.service Normal file
View File

@@ -0,0 +1,16 @@
[Unit]
Description=pgvecter API
After=network.target
[Service]
Type=simple
WorkingDirectory=/opt/pgvecterAPI
EnvironmentFile=/opt/pgvecterAPI/.env
ExecStart=/opt/pgvecterAPI/pgvecter-api
Restart=always
RestartSec=3
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target