Asynchronous programming in Keyramid

UC, Irvine의 SDCL에서 학부생 방문 연구원으로 일하며 크라우드소싱을 통한 소프트웨어 디자인 워크 플로우를 설계하고, 이를 실험하기 위한 웹 어플리케이션을 개발하고 있다. 실험은 아마존 메카니컬 터크(AMT, 우리가 아는 그 Amazon이다… 이 친구들은 크라우드 소싱도 하고 있었다.)에서 실제 크라우드 워커들을 상대로 진행되며, 참여자들에게 실제 현금 보상 또한 이루어진다. 이 임금 지불은 AMT 플랫폼을 통해 진행되며, 누구에게 얼마를 왜 주는지 Authentication을 가진 계정으로 API를 통해 요청하면 등록된 카드를 통해 결제가 이루어 진다.

 

Situation

스크린샷 2017-11-10 오후 3.00.17

위와 같은 엔딩 페이지에서, 이 HIT(작업, 일감?)의 마지막 작업자가 Submit 버튼을 누르면 자신의 작업(Assignment)을 종료하고 그 결과를 AMT에 전송한다. 우리는 자체 로직에 의해 작업자들의 결과를 평가하고 보상하기 위해 우리의 어플리케이션 서버에도 해당 Submit 요청을 전송한다.

스크린샷 2017-11-10 오후 2.42.04

Submit 요청이 전송된 후에 일어나야 하는 일은 다음 순서와 같다.

  1. AMT는 해당 작업자의 Assignment 상태를 “Submitted”로 업데이트 한다.
  2. 어플리케이션 서버(Keyramid)는 작업자의 결과를 평가해 보상 비용을 결정하고 해당 비용을 지불해줄 것을 AMT에 요청한다.
  3. AMT는 어플리케이션 서버의 요청을 받고 해당 Assignment의 Status가 “Submitted”라면 요청된 보상 지불을 시행한다.

스크린샷 2017-11-10 오후 2.42.34

하지만 무슨 일인지 AMT api 서버 측의 데이터 업데이트가 매우 느린 것 같다. (이는 api를 통해 HIT(작업)을 생성했을 때도 확인할 수 있다. 생성도 되었고, 링크도 주어졌지만, 검색 목록에까지 나오는데 수 분이 걸린다.) 그래서 다음과 같은 순서로 작업이 진행되며 버그가 발생한다.

  1. 어플리케이션 서버(Keyramid)는 작업자의 결과를 평가해 보상 비용을 결정하고 해당 비용을 지불해줄 것을 AMT에 요청한다.
  2. AMT는 어플리케이션 서버의 요청을 받고 해당 Assignment의 Status가 “Submitted”라면 요청된 보상 지불을 시행한다.
  3. AMT는 해당 작업자의 Assignment 상태를 “Submitted”로 업데이트 한다.

AMT가 보상 비용 지불 요청을 받았을 때, 해당 Assignment의 상태가 아직 “Submitted”가 아니기 때문에 해당 상태에선 작업을 실행할 수 없다는 예외를 발생시킨다. 그리고 무엇보다 작업자에게 임금이 지불되지 않는다. 이는 매우 민감한 문제다.

Solution

결국 임금을 지불하기 위해서는, AMT에서 해당 Assignment의 Status가 “Submitted”로 바뀌었을 때 임금 지불을 요청해야 한다. 하지만 AMT에서 먼저 Status 변경을 알려주는 이벤트가 없으므로 우리 어플리케이션이 직접 확인해야 한다.

While 문을 돌면서 지속적으로 확인을 해야하는데, 무한 루프에서 발생하는 그 CPU 낭비와, Sleep에서 오는 지루한 시간 지연을 어떡할 것인가? 100명의 작업자에게 임금을 지불하기 위해서는 평균 10초 정도 Sleep해야 된다고 가정하면, 1000초의 시간 낭비와 CPU 낭비가 발생한다. 나는 이렇게 낭비적인 프로그래밍을 하고 싶지 않았다. 그래서 비동기적으로 이 문제를 해결하기로 했다. 실제 코드 로직이 돌아가는 시간이 Blocking되는 시간보다 훨씬 짧은 I/O Bound process이기 때문에 Single thread 기반으로도 충분히 해결할 수 있다고 생각했다.

%e1%84%89%e1%85%b3%e1%84%8f%e1%85%b3%e1%84%85%e1%85%b5%e1%86%ab%e1%84%89%e1%85%a3%e1%86%ba-2016-11-24-%e1%84%8b%e1%85%a9%e1%84%92%e1%85%ae-10-09-43
편의를 위해 Node.js를 설명하기 위한 그림으로…

내가 100명의 작업자에 대한 임금 지불을 비동기적으로 실행하면 각각의 작업자에 대한 임금 지불이 Event queue에 들어간다. A 작업자의 Assignment Status를 확인하기 위한 API Call을 보낸다. 해당 콜의 응답 또한 그리 빠르지 않기 때문에 ThreadPool에서 워커를 꺼내와 해당 작업을 맡긴다. 그리고 Event queue의 다음 작업자 B에 대한 요청을 실행한다. 첫번째 작업자에 대한 api 콜이 끝나면, 다시 Event queue에 들어가 순서를 기다린다. Queue에서 A 작업자의 차례가 다시 오면, Status가 “Submitted”인지 확인하고 아니면 5초를 Sleep하며 다시 Event queue의 뒤로 들어간다. 이 과정을 반복하다 Status가 “Submitted”가 되면 반복문을 탈출해 Pay를 요청한다. 이러한아닌 단일 쓰레드와 몇 개의 쓰레드로 구성 된 쓰레드 풀로 구성된 비동기적 설계를 통해 100개의 멀티쓰레드 혹은 동기적 단일 쓰레드로 문제를 해결할 때보다 훨씬 빠르고 자원 효율적으로 문제를 빠르게 해결할 수 있다.

Implementation

from botocore.exceptions import ClientError
from concurrent.futures import ThreadPoolExecutor
from keyramid.celeryconf import app
from .client import api
import asyncio
from collections import namedtuple


class Bill(namedtuple('Bill', ['worker_id', 'assignment_id', 'amount'])):
    @staticmethod
    def _get_assignment(assignment_id):
        return api.get_assignment(AssignmentId=assignment_id)

    @staticmethod
    def _send_bonus(worker_id, assignment_id, bonus_amount):
        return api.send_bonus(WorkerId=worker_id,
                              AssignmentId=assignment_id,
                              BonusAmount="{:.2f}".format(bonus_amount),
                              Reason="Matched with the crowd's result")

    async def _approve(self, assignment_id, loop):
        assignment = {'AssignmentStatus': None}
        await asyncio.sleep(10)
        with ThreadPoolExecutor(max_workers=5) as executor:
            while assignment['AssignmentStatus'] != 'Submitted':
                try:
                    res = await loop.run_in_executor(executor, self._get_assignment, assignment_id)
                    assignment = res['Assignment']
                except ClientError:
                    pass
                finally:
                    print(assignment['AssignmentStatus'])
                    await asyncio.sleep(5)
        res = api.approve_assignment(AssignmentId=assignment['AssignmentId'])
        print(res)

    async def _pay_bonus(self, worker_id, bonus_amount, assignment_id, loop):
        if bonus_amount:
            with ThreadPoolExecutor(max_workers=1) as executor:
                await loop.run_in_executor(executor, self._send_bonus, worker_id, assignment_id, bonus_amount)

    async def pay(self, loop):
        await self._approve(self.assignment_id, loop)
        await self._pay_bonus(self.worker_id,
                              self.amount,
                              self.assignment_id,
                              loop)


@app.task
def pay_bills(bills):
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.gather(
        *[Bill(*bill).pay(loop) for bill in bills]
    ))


Asynchronous programming in Keyramid”의 1개의 생각

  1. 좋은 정보 얻어갑니다.

    좋아요

댓글 남기기