세션 풀에서 라운드로빈 방식으로 세션을 획득하도록 개선하였으며, 인페인팅 이미지 API에 요청 ID를 추가하여 배치 처리 시 로깅 정보를 강화하였습니다. 배치 처리 로그에 요청 ID를 포함하여 VRAM 사용량 및 세션 상태를 보다 명확히 기록하도록 수정하였습니다.
This commit is contained in:
parent
9619fbc1db
commit
907d28c8bf
|
|
@ -346,6 +346,7 @@ async def inpaint_image(
|
||||||
job_data = {
|
job_data = {
|
||||||
"image": image,
|
"image": image,
|
||||||
"mask": mask,
|
"mask": mask,
|
||||||
|
"request_id": req_id,
|
||||||
"prompt": request.prompt,
|
"prompt": request.prompt,
|
||||||
"negative_prompt": request.negative_prompt,
|
"negative_prompt": request.negative_prompt,
|
||||||
"sd_seed": request.sd_seed,
|
"sd_seed": request.sd_seed,
|
||||||
|
|
|
||||||
|
|
@ -52,6 +52,8 @@ class SessionPool:
|
||||||
self.conditions: Dict[ModelType, asyncio.Condition] = {
|
self.conditions: Dict[ModelType, asyncio.Condition] = {
|
||||||
mt: asyncio.Condition() for mt in ModelType
|
mt: asyncio.Condition() for mt in ModelType
|
||||||
}
|
}
|
||||||
|
# 세션 획득 라운드로빈 인덱스
|
||||||
|
self._rr_index: Dict[ModelType, int] = {mt: 0 for mt in ModelType}
|
||||||
self._initialized = False
|
self._initialized = False
|
||||||
self._reaper_task: Optional[asyncio.Task] = None
|
self._reaper_task: Optional[asyncio.Task] = None
|
||||||
|
|
||||||
|
|
@ -180,12 +182,19 @@ class SessionPool:
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
async with condition:
|
async with condition:
|
||||||
for session in self.pools[model_type]:
|
pool = self.pools[model_type]
|
||||||
|
if pool:
|
||||||
|
start_idx = self._rr_index[model_type] % len(pool)
|
||||||
|
for i in range(len(pool)):
|
||||||
|
idx = (start_idx + i) % len(pool)
|
||||||
|
session = pool[idx]
|
||||||
if not session.in_use:
|
if not session.in_use:
|
||||||
session.in_use = True
|
session.in_use = True
|
||||||
session.mark_used()
|
session.mark_used()
|
||||||
total = len(self.pools[model_type])
|
# 다음 라운드로빈 시작 인덱스 갱신
|
||||||
in_use = sum(1 for s in self.pools[model_type] if s.in_use)
|
self._rr_index[model_type] = (idx + 1) % len(pool)
|
||||||
|
total = len(pool)
|
||||||
|
in_use = sum(1 for s in pool if s.in_use)
|
||||||
logger.info(f"[{model_type.value}] acquire {session.session_id} (in_use={in_use}/{total})")
|
logger.info(f"[{model_type.value}] acquire {session.session_id} (in_use={in_use}/{total})")
|
||||||
return session
|
return session
|
||||||
|
|
||||||
|
|
@ -207,6 +216,9 @@ class SessionPool:
|
||||||
self.pools[model_type].append(new_session)
|
self.pools[model_type].append(new_session)
|
||||||
self._log_pool_status("create", model_type.value) # 위치 변경
|
self._log_pool_status("create", model_type.value) # 위치 변경
|
||||||
logger.info(f"Acquired new session {new_session.session_id} as VRAM is sufficient ({free_vram_ratio:.2f} > {settings.SESSION_VRAM_THRESHOLD:.2f})")
|
logger.info(f"Acquired new session {new_session.session_id} as VRAM is sufficient ({free_vram_ratio:.2f} > {settings.SESSION_VRAM_THRESHOLD:.2f})")
|
||||||
|
# 새 세션 추가 후 라운드로빈 인덱스를 다음으로 이동
|
||||||
|
pool = self.pools[model_type]
|
||||||
|
self._rr_index[model_type] = (len(pool) - 1 + 1) % len(pool)
|
||||||
return new_session
|
return new_session
|
||||||
except Exception:
|
except Exception:
|
||||||
# 세션 생성 실패 시 루프를 계속하여 다시 시도하거나 대기
|
# 세션 생성 실패 시 루프를 계속하여 다시 시도하거나 대기
|
||||||
|
|
|
||||||
|
|
@ -374,6 +374,7 @@ class WorkerManager:
|
||||||
model_type = ModelType.SIMPLE_LAMA
|
model_type = ModelType.SIMPLE_LAMA
|
||||||
stats_model_key = 'simple_lama'
|
stats_model_key = 'simple_lama'
|
||||||
batch_size = len(batch_data)
|
batch_size = len(batch_data)
|
||||||
|
request_ids = [d.get('request_id') for d in batch_data if isinstance(d, dict)]
|
||||||
|
|
||||||
async with session_pool.get_session(model_type) as session:
|
async with session_pool.get_session(model_type) as session:
|
||||||
|
|
||||||
|
|
@ -385,7 +386,8 @@ class WorkerManager:
|
||||||
session_id = getattr(session, 'session_id', 'unknown')
|
session_id = getattr(session, 'session_id', 'unknown')
|
||||||
logger.info(
|
logger.info(
|
||||||
f"🧠[simple_lama][{session_id}] Batch Start (id={batch_id or '-'}, size={batch_size}) "
|
f"🧠[simple_lama][{session_id}] Batch Start (id={batch_id or '-'}, size={batch_size}) "
|
||||||
f"VRAM: {used:.1f}/{total:.1f} {unit or 'GiB'} ({usage_percent:.1f}%)"
|
f"VRAM: {used:.1f}/{total:.1f} {unit or 'GiB'} ({usage_percent:.1f}%), "
|
||||||
|
f"req_ids={request_ids}"
|
||||||
)
|
)
|
||||||
|
|
||||||
start_time = time.time()
|
start_time = time.time()
|
||||||
|
|
@ -409,7 +411,7 @@ class WorkerManager:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"✅[simple_lama][{session_id}] Batch End (id={batch_id or '-'}, size={batch_size}) "
|
f"✅[simple_lama][{session_id}] Batch End (id={batch_id or '-'}, size={batch_size}) "
|
||||||
f"VRAM: {used_after:.1f}/{total_after:.1f} {unit_after or 'GiB'} ({usage_percent_after:.1f}%) | "
|
f"VRAM: {used_after:.1f}/{total_after:.1f} {unit_after or 'GiB'} ({usage_percent_after:.1f}%) | "
|
||||||
f"Duration: {duration:.3f}s"
|
f"Duration: {duration:.3f}s, req_ids={request_ids}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# 통계 기록: 배치 전체 처리 시간 / 배치 크기
|
# 통계 기록: 배치 전체 처리 시간 / 배치 크기
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,64 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import re
|
||||||
|
import json
|
||||||
|
import argparse
|
||||||
|
|
||||||
|
PAT_BATCH_START = re.compile(r"Batch Start \(id=(?P<bid>[^,]+), size=(?P<size>\d+)\).+req_ids=\[(?P<reqs>[^\]]*)\]")
|
||||||
|
PAT_META = re.compile(r"\[(INPAINT_META|REMOVEBG_META)\] (\{.*\})")
|
||||||
|
|
||||||
|
|
||||||
|
def parse_req_ids(s: str):
|
||||||
|
s = s.strip()
|
||||||
|
if not s:
|
||||||
|
return []
|
||||||
|
return [x.strip("' \"") for x in s.split(',') if x.strip()]
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
ap = argparse.ArgumentParser()
|
||||||
|
ap.add_argument("--log", required=True, help="path to main_size.log")
|
||||||
|
args = ap.parse_args()
|
||||||
|
|
||||||
|
batches = []
|
||||||
|
metas = []
|
||||||
|
with open(args.log, "r", encoding="utf-8") as f:
|
||||||
|
for line in f:
|
||||||
|
m1 = PAT_BATCH_START.search(line)
|
||||||
|
if m1:
|
||||||
|
reqs = parse_req_ids(m1.group("reqs"))
|
||||||
|
batches.append({
|
||||||
|
"batch_id": m1.group("bid"),
|
||||||
|
"size": int(m1.group("size")),
|
||||||
|
"request_ids": reqs,
|
||||||
|
"line": line.strip(),
|
||||||
|
})
|
||||||
|
m2 = PAT_META.search(line)
|
||||||
|
if m2:
|
||||||
|
try:
|
||||||
|
metas.append(json.loads(m2.group(2)))
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# 매핑: request_id -> meta
|
||||||
|
req_meta = {}
|
||||||
|
for m in metas:
|
||||||
|
rid = m.get("request_id")
|
||||||
|
if rid:
|
||||||
|
req_meta.setdefault(rid, []).append(m)
|
||||||
|
|
||||||
|
# 결과 요약 출력
|
||||||
|
print("Batches:", len(batches))
|
||||||
|
for b in batches[-20:]:
|
||||||
|
mapped = {rid: req_meta.get(rid, []) for rid in b["request_ids"]}
|
||||||
|
print(json.dumps({
|
||||||
|
"batch_id": b["batch_id"],
|
||||||
|
"size": b["size"],
|
||||||
|
"request_ids": b["request_ids"],
|
||||||
|
"meta_found": {k: len(v) for k, v in mapped.items()},
|
||||||
|
}, ensure_ascii=False))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -0,0 +1,67 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
import asyncio
|
||||||
|
import aiohttp
|
||||||
|
import base64
|
||||||
|
import time
|
||||||
|
import json
|
||||||
|
from typing import List
|
||||||
|
|
||||||
|
API_URL = "http://127.0.0.1:8008/api/v1/inpaint"
|
||||||
|
|
||||||
|
|
||||||
|
def img_to_b64(path: str) -> str:
|
||||||
|
with open(path, "rb") as f:
|
||||||
|
return "data:image/png;base64," + base64.b64encode(f.read()).decode("utf-8")
|
||||||
|
|
||||||
|
|
||||||
|
async def post_inpaint(session: aiohttp.ClientSession, image_b64: str, mask_b64: str):
|
||||||
|
payload = {
|
||||||
|
"image": image_b64,
|
||||||
|
"mask": mask_b64,
|
||||||
|
"model_name": "simple-lama",
|
||||||
|
}
|
||||||
|
t0 = time.time()
|
||||||
|
async with session.post(API_URL, json=payload) as resp:
|
||||||
|
_ = await resp.read()
|
||||||
|
dt = time.time() - t0
|
||||||
|
return resp.status, dt
|
||||||
|
|
||||||
|
|
||||||
|
async def run_scenario(concurrency: int, total: int, img_path: str, mask_path: str):
|
||||||
|
image_b64 = img_to_b64(img_path)
|
||||||
|
mask_b64 = img_to_b64(mask_path)
|
||||||
|
async with aiohttp.ClientSession() as session:
|
||||||
|
tasks = []
|
||||||
|
# 워밍업: 배치 2를 유도하기 위해 빠르게 2개
|
||||||
|
for _ in range(2):
|
||||||
|
tasks.append(asyncio.create_task(post_inpaint(session, image_b64, mask_b64)))
|
||||||
|
results = await asyncio.gather(*tasks)
|
||||||
|
print("Warmup:", results)
|
||||||
|
|
||||||
|
# 본 테스트: 동시성(concurrency)로 total개 요청 보내기
|
||||||
|
pending = 0
|
||||||
|
idx = 0
|
||||||
|
inflight: List[asyncio.Task] = []
|
||||||
|
latencies = []
|
||||||
|
while idx < total or inflight:
|
||||||
|
while idx < total and len(inflight) < concurrency:
|
||||||
|
inflight.append(asyncio.create_task(post_inpaint(session, image_b64, mask_b64)))
|
||||||
|
idx += 1
|
||||||
|
done, inflight = await asyncio.wait(inflight, return_when=asyncio.FIRST_COMPLETED)
|
||||||
|
for d in done:
|
||||||
|
code, dt = d.result()
|
||||||
|
latencies.append(dt)
|
||||||
|
print("count=", len(latencies), "avg=", sum(latencies)/len(latencies), "max=", max(latencies))
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
import argparse
|
||||||
|
p = argparse.ArgumentParser()
|
||||||
|
p.add_argument("--img", required=True)
|
||||||
|
p.add_argument("--mask", required=True)
|
||||||
|
p.add_argument("--concurrency", type=int, default=2)
|
||||||
|
p.add_argument("--total", type=int, default=10)
|
||||||
|
args = p.parse_args()
|
||||||
|
asyncio.run(run_scenario(args.concurrency, args.total, args.img, args.mask))
|
||||||
|
|
||||||
|
|
||||||
Binary file not shown.
|
After Width: | Height: | Size: 3.2 KiB |
Binary file not shown.
|
After Width: | Height: | Size: 6.8 KiB |
Loading…
Reference in New Issue