diff --git a/pyspider/run.py b/pyspider/run.py index 2dc6a5a32..2fe6de1f5 100755 --- a/pyspider/run.py +++ b/pyspider/run.py @@ -638,6 +638,33 @@ def one(ctx, interactive, enable_phantomjs, scripts): phantomjs_obj.quit() +@cli.command() +@click.option('--scheduler-rpc', callback=connect_rpc, help='xmlrpc path of scheduler') +@click.argument('project', nargs=1) +@click.argument('message', nargs=1) +@click.pass_context +def send_message(ctx, scheduler_rpc, project, message): + if isinstance(scheduler_rpc, six.string_types): + scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc) + if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'): + scheduler_rpc = connect_rpc(ctx, None, 'http://%s/' % ( + os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):])) + if scheduler_rpc is None: + scheduler_rpc = connect_rpc(ctx, None, 'http://localhost:23333/') + + return scheduler_rpc.send_task({ + 'taskid': utils.md5string('data:,on_message'), + 'project': project, + 'url': 'data:,on_message', + 'fetch': { + 'save': ('__command__', message), + }, + 'process': { + 'callback': '_on_message', + } + }) + + def main(): cli() diff --git a/pyspider/scheduler/scheduler.py b/pyspider/scheduler/scheduler.py index e0bee2586..cfba8d915 100644 --- a/pyspider/scheduler/scheduler.py +++ b/pyspider/scheduler/scheduler.py @@ -447,6 +447,12 @@ def new_task(task): return False server.register_function(new_task, 'newtask') + def send_task(task): + '''dispatch task to fetcher''' + self.send_task(task) + return True + server.register_function(send_task, 'send_task') + def update_project(): self._force_update_project = True server.register_function(update_project, 'update_project') diff --git a/tests/test_run.py b/tests/test_run.py index ba2a7c517..44ea0902f 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -293,3 +293,44 @@ def wait_text(timeout=1): self.assertIn('scheduler exiting...', text) os.close(fd) os.kill(pid, signal.SIGINT) + +class TestSendMessage(unittest.TestCase): + + @classmethod + def setUpClass(self): + shutil.rmtree('./data/tests', ignore_errors=True) + os.makedirs('./data/tests') + + ctx = run.cli.make_context('test', [ + '--taskdb', 'sqlite+taskdb:///data/tests/task.db', + '--projectdb', 'sqlite+projectdb:///data/tests/projectdb.db', + '--resultdb', 'sqlite+resultdb:///data/tests/resultdb.db', + ], None, obj=dict(testing_mode=True)) + self.ctx = run.cli.invoke(ctx) + + ctx = run.scheduler.make_context('scheduler', [], self.ctx) + scheduler = run.scheduler.invoke(ctx) + utils.run_in_thread(scheduler.xmlrpc_run) + utils.run_in_thread(scheduler.run) + + time.sleep(1) + + @classmethod + def tearDownClass(self): + for each in self.ctx.obj.instances: + each.quit() + time.sleep(1) + + shutil.rmtree('./data/tests', ignore_errors=True) + + def test_10_send_message(self): + ctx = run.send_message.make_context('send_message', [ + 'test_project', 'test_message' + ], self.ctx) + self.assertTrue(run.send_message.invoke(ctx)) + while True: + task = self.ctx.obj.scheduler2fetcher.get(timeout=1) + if task['url'] == 'data:,on_message': + break + self.assertEqual(task['process']['callback'], '_on_message') +