8000 current progress implementing a loadbalancing platform · potatochip/corenlp-python@f6b2a9c · GitHub
[go: up one dir, main page]

Skip to content

Commit f6b2a9c

Browse files
author
Robert Elwell
committed
current progress implementing a loadbalancing platform
1 parent 8536125 commit f6b2a9c

File tree

3 files changed

+113
-7
lines changed

3 files changed

+113
-7
lines changed

corenlp/corenlp.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import pexpect
2929
import tempfile
3030
import shutil
31+
from loadbalancer import CoreNLPLoadBalancer
3132
from progressbar import ProgressBar, Fraction
3233
from unidecode import unidecode
3334
from subprocess import call
@@ -74,7 +75,7 @@ def __init__(self, value):
7475

7576
def __str__(self):
7677
return repr(self.value)
77-
78+
7879

7980
def init_corenlp_command(corenlp_path, memory, properties):
8081
"""
@@ -482,6 +483,8 @@ def batch_parse(input_folder, corenlp_path=DIRECTORY, memory="3g", raw_output=Fa
482483
parser = optparse.OptionParser(usage="%prog [OPTIONS]")
483484
parser.add_option('-p', '--port', default='8080',
484485
help='Port to serve on (default 8080)')
486+
parser.add_option('-o', '--ports', default=None,
487+
help='Multiple ports, separated by commas')
485488
parser.add_option('-H', '--host', default='127.0.0.1',
486489
help='Host to serve on (default localhost; 0.0.0.0 to make public)')
487490
parser.add_option('-q', '--quiet', action='store_false', default=True, dest='verbose',
@@ -495,14 +498,24 @@ def batch_parse(input_folder, corenlp_path=DIRECTORY, memory="3g", raw_output=Fa
495498
# server = jsonrpc.Server(jsonrpc.JsonRpc20(),
496499
# jsonrpc.TransportTcpIp(addr=(options.host, int(options.port))))
497500
try:
498-
server = SimpleJSONRPCServer((options.host, int(options.port)))
501+
if not options.ports:
502+
server = SimpleJSONRPCServer((options.host, int(options.port)))
499503

500-
nlp = StanfordCoreNLP(options.corenlp, properties=options.properties)
501-
server.register_function(nlp.parse)
504+
nlp = StanfordCoreNLP(options.corenlp, properties=options.properties)
505+
server.register_function(nlp.parse)
502506

503-
print 'Serving on http://%s:%s' % (options.host, options.port)
504-
# server.serve()
505-
server.serve_forever()
507+
print 'Serving on http://%s:%s' % (options.host, options.port)
508+
509+
server.serve_forever()
510+
else:
511+
server = SimpleJSONRPCServer((options.host, int(options.port)))
512+
lb = CoreNLPLoadBalancer(options)
513+
server.register_function(lb.send)
514+
server.register_function(lb.receive)
515+
516+
print 'Serving on http://%s:%s, with servers on ports %s' % (options.host, options.port, options.ports)
517+
518+
server.serve_forever()
506519
except KeyboardInterrupt:
507520
print >>sys.stderr, "Bye."
508521
exit()

corenlp/loadbalancer.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""
2+
A load balancing platform for the CoreNLP python server.
3+
This allows us to keep multiple instances of the server open
4+
at different ports, and allow the same script to handle
5+
loadbalancing so client scripts need not worry about such logic.
6+
"""
7+
8+
import os, requests, json, sys, jsonrpclib
9+
from subprocess import Popen
10+
from hashlib import sha1
11+
12+
class CoreNLPLoadBalancer:
13+
def __init__(self, options):
14+
self.tempdir = "/tmp/"
15+
self.ports = options.ports.split(',')
16+
self.host = options.host
17+
self.serverPool = []
18+
self.processPool = {}
19+
self.args = ["python", os.getcwd() + "/corenlp.py", \
20+
'--host=%s' % (options.host), \
21+
'--properties=%s' % (options.properties), \
22+
'--corenlp=%s' % (options.corenlp)]
23+
if not options.verbose:
24+
args += ['--quiet']
25+
self.portCounter = 0
26+
27+
28+
def startup(self):
29+
""" Open a traditional server subprocess in a new port """
30+
for port in self.ports:
31+
self.serverPool[port] = Popen(args + ['--port=%s' % str(port)])
32+
33+
def shutdown(self):
34+
for port in self.ports:
35+
self.serverPool[port].terminate()
36+
37+
def sendThreadedRequest(self, key, port):
38+
""" Create a process that communicates with the server in a thread to avoid blocking """
39+
host = 'http://%s:%s' % (self.host.replace('http://', ''), port)
40+
filename = self.tempdir+key+".tmp"
41+
self.processPool[key] = [Popen(['python', 'subserver.py', host, filename], stdout=PIPE)]
42+
43+
def send(self, text):
44+
"""
45+
Writes a temp file with the current text. The subserver script deletes this file for us.
46+
The response sent provides a sha1 key that corresponds to your requested document so we
47+
can correlate requests to responses.
48+
"""
49+
currentPort = self.ports[self.portCounter]
50+
key = sha1(text)
51+
filename = self.tempdir+key+".tmp"
52+
f = open(filename, 'w')
53+
f.write(text)
54+
f.close()
55 23DA +
self.sendThreadedRequest(key, currentPort)
56+
return {'status':'OK', 'key':key}
57+
58+
def receive(self, blocking=False):
59+
""" Returns all completed parses. Set blocking to True on your last iteration! """
60+
go = True
61+
response = []
62+
while go:
63+
for key in self.processPool.keys():
64+
process = self.processPool[key]
65+
if process.poll() != None:
66+
(out, error) = process.communicate()
67+
if out:
68+
try:
69+
response[key] = [json.loads(out)]
70+
except:
71+
pass
72+
del self.processPool[key]
73+
go = blocking and len(self.processPool) > 0
74+
return response

corenlp/subserver.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
"""
2+
This subserver scripts maintain a connection with child
3+
processes so that our requests are not blocking
4+
5+
arg 1: server
6+
arg 2: filename of tmp file
7+
8+
"""
9+
10+
import sys, jsonrpclib, os
11+
12+
server = jsonrpclib.Server(sys.argv[1])
13+
filename = sys.argv[2]
14+
text = "\n".join(open(filename, r)).readlines()
15+
16+
print server.parse(text)
17+
18+
os.remove(filename)
19+

0 commit comments

Comments
 (0)
0