8000 got the loadbalancer working · relwell/stanford-corenlp-python@f0b4925 · GitHub
[go: up one dir, main page]

Skip to content

Commit f0b4925

Browse files
author
Robert Elwell
committed
got the loadbalancer working
1 parent f6b2a9c commit f0b4925

File tree

3 files changed

+58
-22
lines changed

3 files changed

+58
-22
lines changed

corenlp/corenlp.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,11 +511,15 @@ def batch_parse(input_folder, corenlp_path=DIRECTORY, memory="3g", raw_output=Fa
511511
server = SimpleJSONRPCServer((options.host, int(options.port)))
512512
lb = CoreNLPLoadBalancer(options)
513513
server.register_function(lb.send)
514-
server.register_function(lb.receive)
514+
server.register_function(lb.getAll)
515+
server.register_function(lb.getCompleted)
516+
server.register_function(lb.getForKey)
515517

516518
print 'Serving on http://%s:%s, with servers on ports %s' % (options.host, options.port, options.ports)
517519

518520
server.serve_forever()
519521
except KeyboardInterrupt:
522+
if options.ports:
523+
lb.shutdown()
520524
print >>sys.stderr, "Bye."
521525
exit()

corenlp/loadbalancer.py

Lines changed: 52 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,29 @@
66
"""
77

88
import os, requests, json, sys, jsonrpclib
9-
from subprocess import Popen
9+
from subprocess import Popen, PIPE
1010
from hashlib import sha1
1111

1212
class CoreNLPLoadBalancer:
1313
def __init__(self, options):
1414
self.tempdir = "/tmp/"
1515
self.ports = options.ports.split(',')
1616
self.host = options.host
17-
self.serverPool = []
17+
self.serverPool = {}
1818
self.processPool = {}
19-
self.args = ["python", os.getcwd() + "/corenlp.py", \
19+
self.args = ["python", os.getcwd() + "/corenlp/corenlp.py", \
2020
'--host=%s' % (options.host), \
2121
'--properties=%s' % (options.properties), \
2222
'--corenlp=%s' % (options.corenlp)]
2323
if not options.verbose:
24-
args += ['--quiet']
24+
self.args += ['--quiet']
2525
self.portCounter = 0
26-
26+
self.startup()
2727

2828
def startup(self):
2929
""" Open a traditional server subprocess in a new port """
3030
for port in self.ports:
31-
self.serverPool[port] = Popen(args + ['--port=%s' % str(port)])
31+
self.serverPool[port] = Popen(self.args + ['--port=%s' % str(port)])
3232

3333
def shutdown(self):
3434
for port in self.ports:
@@ -38,7 +38,7 @@ def sendThreadedRequest(self, key, port):
3838
""" Create a process that communicates with the server in a thread to avoid blocking """
3939
host = 'http://%s:%s' % (self.host.replace('http://', ''), port)
4040
filename = self.tempdir+key+".tmp"
41-
self.processPool[key] = [Popen(['python', 'subserver.py', host, filename], stdout=PIPE)]
41+
self.processPool[key] = Popen(['python', os.getcwd()+'/corenlp/subserver.py', host, filename], stdout=PIPE)
4242

4343
def send(self, text):
4444
"""
@@ -47,28 +47,60 @@ def send(self, text):
4747
can correlate requests to responses.
4848
"""
4949
currentPort = self.ports[self.portCounter]
50-
key = sha1(text)
50+
key = sha1(text).hexdigest()
5151
filename = self.tempdir+key+".tmp"
5252
f = open(filename, 'w')
5353
f.write(text)
5454
f.close()
5555
self.sendThreadedRequest(key, currentPort)
5656
return {'status':'OK', 'key':key}
5757

58-
def receive(self, blocking=False):
59-
""" Returns all completed parses. Set blocking to True on your last iteration! """
60-
go = True
61-
response = []
62-
while go:
58+
def getCompleted(self):
59+
""" Returns all completed parses. Set blocking to True on your last iteration to get all data """
60+
docResponse = {}
61+
response = {'status':'OK'}
62+
try:
6363
for key in self.processPool.keys():
64+
print key
6465
process = self.processPool[key]
66+
print process
6567
if process.poll() != None:
66-
(out, error) = process.communicate()
67-
if out:
68-
try:
69-
response[key] = [json.loads(out)]
70-
except:
71-
pass
68+
docResponse[key] = self.getForFinishedProcess(process)
7269
del self.processPool[key]
73-
go = blocking and len(self.processPool) > 0
70+
except:
71+
response['status'] = 'ERROR'
72+
response['error'] = sys.exc_info()[1]
73+
response['parses'] = docResponse
74+
return response
75+
76+
def getAll(self):
77+
""" Blocking counterpart to getCompleted. Wait for all currently open processes to complete and send response. """
78+
response = {}
79+
for key in self.processPool.keys():
80+
response[key] = self.getForKey(key)
81+
return {'status':'OK', 'parses':response}
82+
83+
def getForFinishedProcess(self, process):
84+
""" Returns a dictionary with json string or empty dictionary """
85+
response = {}
86+
print 'communicating'
87+
(out, error) = process.communicate()
88+
if out:
89+
try:
90+
response = json.loads(out)
91+
except:
92+
print sys.exc_info()
93+
pass
94+
return response
95+
96+
def getForKey(self, key):
97+
""" Retrieves a response for a given key. This is blocking. """
98+
response = {}
99+
if key in self.processPool.keys():
100+
process = self.processPool[key]
101+
if process.poll == None:
102+
process.wait()
103+
response[key] = self.getForFinishedProcess(process)
104+
else:
105+
response[key] = {}
74106
return response

corenlp/subserver.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
server = jsonrpclib.Server(sys.argv[1])
1313
filename = sys.argv[2]
14-
text = "\n".join(open(filename, r)).readlines()
14+
text = "\n".join(open(filename, 'r').readlines())
1515

1616
print server.parse(text)
1717

0 commit comments

Comments
 (0)
0