Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
Merge pull request #156 from facebookresearch/mturk
Browse files Browse the repository at this point in the history
Allow multiple assignments per HIT
  • Loading branch information
yf225 authored Jun 21, 2017
2 parents 7d1a2ea + 3303950 commit 3d86ebe
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 126 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ The mturk library contains the following directories:

To run an MTurk task:
- Go into the directory for the task you want to run.
- Run `python run.py -nh <num_hits> -r <reward> [--sandbox]/[--live]`, with `<num_hits>` and `<reward>` set appropriately. Use `--sandbox` to run the task in MTurk sandbox mode before pushing it live.
- Run `python run.py -nh <num_hits> -na <num_assignments> -r <reward> [--sandbox]/[--live]`, with `<num_hits>`, `<num_assignments>` and `<reward>` set appropriately. Use `--sandbox` to run the task in MTurk sandbox mode before pushing it live.

To add your own MTurk task:
- create a new folder within the mturk/tasks directory for your new task
Expand Down
6 changes: 3 additions & 3 deletions docs/source/mturk.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ Then, to run an MTurk task, first ensure that the task directory is in `parlai/m

.. code-block:: python
python run.py -nh <num_hits> -r <reward> [--sandbox]/[--live]
python run.py -nh <num_hits> -na <num_assignments> -r <reward> [--sandbox]/[--live]
E.g. to create 2 HITs for the `QA Data Collection <https://github.com/facebookresearch/ParlAI/blob/master/parlai/mturk/tasks/qa_data_collection/>`__ example with $0.05 each in sandbox mode, first go into the task directory and then run:
E.g. to create 2 HITs for the `QA Data Collection <https://github.com/facebookresearch/ParlAI/blob/master/parlai/mturk/tasks/qa_data_collection/>`__ example with 1 assignment per HIT and $0.05 per assignment in sandbox mode, first go into the task directory and then run:

.. code-block:: python
python run.py -nh 2 -r 0.05 --sandbox
python run.py -nh 2 -na 1 -r 0.05 --sandbox
Please make sure to test your task in MTurk sandbox mode first (``--sandbox``) before pushing it live (``--live``).

Expand Down
3 changes: 3 additions & 0 deletions parlai/core/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ def add_mturk_args(self):
self.add_argument(
'-nh', '--num-hits', default=2, type=int,
help='number of HITs you want to create for this task')
self.add_argument(
'-na', '--num-assignments', default=1, type=int,
help='number of assignments for each HIT')
self.add_argument(
'-r', '--reward', default=0.05, type=float,
help='reward for each HIT, in US dollars')
Expand Down
77 changes: 39 additions & 38 deletions parlai/mturk/core/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def __init__(self):
self.local_db_file_path = None
self.run_id = None
self.mturk_agent_ids = None
self.all_agent_ids = None

def init_aws(self, opt):
self.run_id = str(int(time.time()))
Expand All @@ -59,7 +60,7 @@ def init_aws(self, opt):
return

print('Setting up MTurk backend...')
html_api_endpoint_url, json_api_endpoint_url, requester_key_gt = setup_aws(task_description=opt['task_description'], num_hits=opt['num_hits'], is_sandbox=opt['is_sandbox'])
html_api_endpoint_url, json_api_endpoint_url, requester_key_gt = setup_aws(task_description=opt['task_description'], num_hits=opt['num_hits'], num_assignments=opt['num_assignments'], is_sandbox=opt['is_sandbox'])
self.html_api_endpoint_url = html_api_endpoint_url
self.json_api_endpoint_url = json_api_endpoint_url
self.requester_key_gt = requester_key_gt
Expand Down Expand Up @@ -104,7 +105,11 @@ def get_new_messages_and_save_to_db(self):
'last_message_id': self.db_last_message_id,
}
request = requests.get(self.json_api_endpoint_url, params=params)
ret = json.loads(request.json())
try:
ret = json.loads(request.json())
except TypeError as e:
print(request.json())
raise e
conversation_dict = ret['conversation_dict']
if ret['last_message_id']:
self.db_last_message_id = ret['last_message_id']
Expand Down Expand Up @@ -152,7 +157,12 @@ def send_new_message(self, task_group_id, conversation_id, agent_id, message_tex
post_data_dict['reward'] = reward

request = requests.post(self.json_api_endpoint_url, data=json.dumps(post_data_dict))
return json.loads(request.json())
try:
ret = json.loads(request.json())
return ret
except TypeError as e:
print(request.json())
raise e

def get_approval_status_count(self, task_group_id, approval_status, requester_key, conversation_id=None):
params = {
Expand All @@ -166,9 +176,32 @@ def get_approval_status_count(self, task_group_id, approval_status, requester_ke
request = requests.get(self.json_api_endpoint_url, params=params)
return request.json()

def create_hits(self, opt):
print('Creating HITs...')
for mturk_agent_id in self.mturk_agent_ids:
for hit_index in range(1, opt['num_hits']+1):
with create_hit_type_lock:
hit_type_id = create_hit_type(
hit_title=opt['hit_title'],
hit_description=opt['hit_description'] + ' (ID: ' + self.task_group_id + ', Role: ' + mturk_agent_id + ')',
hit_keywords=opt['hit_keywords'],
hit_reward=opt['reward'],
is_sandbox=opt['is_sandbox']
)
all_agent_ids_string = str(self.all_agent_ids).replace("'", '''"''')
mturk_chat_url = self.html_api_endpoint_url + "?method_name=chat_index&task_group_id="+str(self.task_group_id)+"&all_agent_ids="+all_agent_ids_string+"&cur_agent_id="+str(mturk_agent_id)+"&task_additional_info="+str(opt.get('task_additional_info', ''))
mturk_page_url = create_hit_with_hit_type(
page_url=mturk_chat_url,
hit_type_id=hit_type_id,
num_assignments=opt['num_assignments'],
is_sandbox=opt['is_sandbox']
)
print("Link to HIT for " + str(mturk_agent_id) + ": " + mturk_page_url + "\n")
print("Waiting for Turkers to respond... (Please don't close your laptop or put your computer into sleep or standby mode.)\n")

def review_hits(self):
mturk_agent_ids_string = str(self.mturk_agent_ids).replace("'", '''"''')
mturk_approval_url = self.html_api_endpoint_url + "?method_name=approval_index&task_group_id="+str(self.task_group_id)+"&conversation_id=1"+"&mturk_agent_ids="+mturk_agent_ids_string+"&requester_key="+self.requester_key_gt
mturk_approval_url = self.html_api_endpoint_url + "?method_name=approval_index&task_group_id="+str(self.task_group_id)+"&hit_index=1&assignment_index=1&mturk_agent_ids="+mturk_agent_ids_string+"&requester_key="+self.requester_key_gt

print("\nAll HITs are done! Please go to the following link to approve/reject them (or they will be auto-approved in 4 weeks if no action is taken):\n")
print(mturk_approval_url)
Expand All @@ -194,41 +227,9 @@ def __init__(self, id, manager, conversation_id, opt, shared=None):
self.manager = manager
self.id = id
self.last_message_id = 0
self.mturk_agent_ids = None
self.all_agent_ids = None

self.is_sandbox = opt['is_sandbox']
self.hit_reward = opt['reward']
self.hit_title = opt['hit_title']
self.hit_description = opt['hit_description']
self.hit_keywords = opt['hit_keywords']
self.task_additional_info = opt.get('task_additional_info', '')

def create_hit(self):
print('Creating HITs...')
with create_hit_type_lock:
hit_type_id = create_hit_type(
hit_title=self.hit_title,
hit_description=self.hit_description + ' (ID: ' + self.manager.task_group_id + ', Role: ' + self.id + ')',
hit_keywords=self.hit_keywords,
hit_reward=self.hit_reward,
is_sandbox=self.is_sandbox
)
all_agent_ids_string = str(self.all_agent_ids).replace("'", '''"''')
mturk_chat_url = self.manager.html_api_endpoint_url + "?method_name=chat_index&task_group_id="+str(self.manager.task_group_id)+"&conversation_id="+str(self.conversation_id)+"&all_agent_ids="+all_agent_ids_string+"&cur_agent_id="+str(self.id)+"&task_additional_info="+str(self.task_additional_info)
mturk_page_url = create_hit_with_hit_type(
page_url=mturk_chat_url,
hit_type_id=hit_type_id,
is_sandbox=self.is_sandbox
)
print("Link to HIT for " + self.id + ": " + mturk_page_url + "\n")
print("Waiting for Turkers to respond... (Please don't close your laptop or put your computer into sleep or standby mode.)\n")

# Notify manager of current configuration
self.manager.mturk_agent_ids = self.mturk_agent_ids

def observe(self, msg):
if msg['id'] not in self.mturk_agent_ids: # If the message sender is an mturk agent, then there is no need to upload this message to db since it's already been done on the message sender side.
if msg['id'] not in self.manager.mturk_agent_ids: # If the message sender is an mturk agent, then there is no need to upload this message to db since it's already been done on the message sender side.
self.manager.get_new_messages_and_save_to_db() # Force a refresh for local db.
conversation_dict, _ = self.manager.get_new_messages(
task_group_id=self.manager.task_group_id,
Expand Down Expand Up @@ -276,6 +277,6 @@ def episode_done(self):

def shutdown(self):
# Loop to ensure all HITs are done
while self.manager.get_approval_status_count(task_group_id=self.manager.task_group_id, conversation_id=self.conversation_id, approval_status='pending', requester_key=self.manager.requester_key_gt) < len(self.mturk_agent_ids):
while self.manager.get_approval_status_count(task_group_id=self.manager.task_group_id, conversation_id=self.conversation_id, approval_status='pending', requester_key=self.manager.requester_key_gt) < len(self.manager.mturk_agent_ids):
time.sleep(polling_interval)
print('Conversation ID: ' + str(self.conversation_id) + ', Agent ID: ' + self.id + ' - HIT is done.')
71 changes: 64 additions & 7 deletions parlai/mturk/core/data_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import sys
import time
import json
import math
from sqlalchemy import Column, ForeignKey, Integer, String, Boolean, UnicodeText
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import create_engine, func

is_python_2 = False
Expand All @@ -26,7 +27,7 @@ class Message(Base):
__tablename__ = 'message'
id = Column(Integer, primary_key=True)
task_group_id = Column(String(255), index=True) # We assign a new task_group_id for each HIT group
conversation_id = Column(Integer, index=True)
conversation_id = Column(String(255), index=True)
agent_id = Column(String(255))
message_content = Column(UnicodeText)

Expand All @@ -35,7 +36,7 @@ class MTurkHITInfo(Base):
__tablename__ = 'mturk_hit_info'
id = Column(Integer, primary_key=True)
task_group_id = Column(String(255), index=True)
conversation_id = Column(Integer, index=True)
conversation_id = Column(String(255), index=True)
assignment_id = Column(String(255))
hit_id = Column(String(255))
worker_id = Column(String(255))
Expand All @@ -44,12 +45,54 @@ class MTurkHITInfo(Base):

def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}


def init_database(host, db_name, username, password):


class MTurkHITAssignmentInfo(Base):
__tablename__ = 'mturk_hit_assignment_info'
id = Column(Integer, primary_key=True)
task_group_id = Column(String(255), index=True)
agent_id = Column(String(255), index=True)


def is_database_schema_consistent(Base, engine):
session_maker = sessionmaker(bind=engine)
session = scoped_session(session_maker)

# Try insert new objects with current schema
try:
test_message = Message(id=0, task_group_id='Test', conversation_id='Test', agent_id='Test', message_content='Test')
session.add(test_message)
session.commit()
session.delete(test_message)
session.commit()

test_hit_info = MTurkHITInfo(id=0, task_group_id='Test', conversation_id='Test', assignment_id='Test', hit_id='Test', worker_id='Test', is_sandbox=True, approval_status='Test')
session.add(test_hit_info)
session.commit()
session.delete(test_hit_info)
session.commit()

test_hit_assignment_info = MTurkHITAssignmentInfo(id=0, task_group_id='Test', agent_id='Test')
session.add(test_hit_assignment_info)
session.commit()
session.delete(test_hit_assignment_info)
session.commit()

return True
except:
return False


def init_database(host, db_name, username, password, should_check_schema_consistency=False):
# Create an engine
engine = create_engine('postgres://'+username+':'+password+'@'+host+':5432/'+db_name)


if should_check_schema_consistency and not is_database_schema_consistent(Base, engine):
# Database schema is inconsistent
input_key = input("Remote database schema is inconsistent. Please stop all other ParlAI MTurk instances, and press any key to continue:")
print('Creating database schema...')
Base.metadata.drop_all(engine)

# Create all tables in the engine. This is equivalent to "Create Table"
# statements in raw SQL.
Base.metadata.create_all(engine)
Expand Down Expand Up @@ -169,6 +212,20 @@ def get_new_messages(db_session, task_group_id, conversation_id=None, after_mess
return conversation_dict, last_message_id


def get_hit_index_and_assignment_index(db_session, task_group_id, agent_id, num_assignments):
new_assignment_object = MTurkHITAssignmentInfo(task_group_id=task_group_id, agent_id=agent_id)
db_session.add(new_assignment_object)
db_session.commit()
object_id = new_assignment_object.id
existing_assignment_id_list = db_session.query(MTurkHITAssignmentInfo.id) \
.filter(MTurkHITAssignmentInfo.task_group_id==task_group_id) \
.filter(MTurkHITAssignmentInfo.agent_id==agent_id) \
.order_by(MTurkHITAssignmentInfo.id).all()
existing_assignment_id_list = [id for (id, ) in existing_assignment_id_list]
index_in_list = existing_assignment_id_list.index(object_id)
return {'hit_index': math.floor(index_in_list / num_assignments) + 1, 'assignment_index': index_in_list % num_assignments + 1}


def set_hit_info(db_session, task_group_id, conversation_id, assignment_id, hit_id, worker_id, is_sandbox, approval_status='pending'):
existing_object = db_session.query(MTurkHITInfo) \
.filter(MTurkHITInfo.task_group_id==task_group_id) \
Expand Down
Loading

0 comments on commit 3d86ebe

Please sign in to comment.