Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TINKERPOP-2555 - Remote Transaction Support (g.tx()) for gremlin-python #1515

Merged
merged 5 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ jobs:
run: |
touch gremlin-python/.glv
mvn clean install -pl -:gremlin-javascript,-gremlin-dotnet,-:gremlin-dotnet-source,-:gremlin-dotnet-tests,-:gremlint -q -DskipTests -Dci
mvn verify -pl gremlin-python
mvn verify -pl gremlin-python -DincludeNeo4j
dotnet:
name: .NET
timeout-minutes: 1200 # 20 minutes
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
[[release-3-5-2]]
=== TinkerPop 3.5.2 (Release Date: NOT OFFICIALLY RELEASED YET)

* Added support for `g.Tx()` in Python.
* Added logging in in Python.
* Fixed shutdown cleanup issue in Python aiohttp transport layer.
* Added a `NoSugarTranslator` translator to `PythonTranslator` which translates Gremlin queries to Python without syntactic sugar (ex `g.V().limit(1)` instead of `g.V()[0:1]`)
* Added support for `g.Tx()` in .NET.
* Added support for `with()` constant options to `io()`.
Expand Down
122 changes: 122 additions & 0 deletions gremlin-python/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ limitations under the License.
<!-- provides a way to convert maven.test.skip value to skipTests for use in skipping python tests -->
<maven.test.skip>false</maven.test.skip>
<skipTests>${maven.test.skip}</skipTests>
<TEST_TRANSACTIONS>false</TEST_TRANSACTIONS>
<gremlin.server.dir>${project.parent.basedir}/gremlin-server</gremlin.server.dir>
</properties>
<build>
Expand Down Expand Up @@ -216,6 +217,7 @@ limitations under the License.
<target>
<exec executable="env/bin/python" dir="${project.build.directory}/python3"
failonerror="true">
<env key="TEST_TRANSACTIONS" value="${TEST_TRANSACTIONS}"/>
<env key="PYTHONPATH" value=""/>
<env key="KRB5_CONFIG" value="${project.build.directory}/kdc/krb5.conf"/>
<env key="KRB5CCNAME" value="${project.build.directory}/kdc/test-tkt.cc"/>
Expand All @@ -224,19 +226,22 @@ limitations under the License.
<!-- radish seems to like all dependencies in place -->
<exec executable="env/bin/python" dir="${project.build.directory}/python3"
failonerror="true">
<env key="TEST_TRANSACTIONS" value="${TEST_TRANSACTIONS}"/>
<env key="PYTHONPATH" value=""/>
<arg line="setup.py install"/>
</exec>
<!-- run for graphson 3.0 -->
<exec executable="env/bin/radish" dir="${project.build.directory}/python3"
failonerror="true">
<env key="TEST_TRANSACTIONS" value="${TEST_TRANSACTIONS}"/>
<env key="PYTHONPATH" value=""/>
<env key="PYTHONIOENCODING" value="utf-8:surrogateescape"/>
<arg line="-f dots -e -t -b ${project.build.directory}/python3/radish ${project.basedir}/../gremlin-test/features/ --user-data=&quot;serializer=application/vnd.gremlin-v3.0+json&quot;"/> <!-- -no-line-jump -->
</exec>
<!-- run for graphbinary 1.0 -->
<exec executable="env/bin/radish" dir="${project.build.directory}/python3"
failonerror="true">
<env key="TEST_TRANSACTIONS" value="${TEST_TRANSACTIONS}"/>
<env key="PYTHONPATH" value=""/>
<env key="PYTHONIOENCODING" value="utf-8:surrogateescape"/>
<arg line="-f dots -e -t -b ${project.build.directory}/python3/radish ${project.basedir}/../gremlin-test/features/ --user-data=&quot;serializer=application/vnd.graphbinary-v1.0&quot;"/> <!-- -no-line-jump -->
Expand All @@ -260,6 +265,11 @@ limitations under the License.
<artifactId>gremlin-test</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.tinkerpop</groupId>
<artifactId>neo4j-gremlin</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
Expand Down Expand Up @@ -354,6 +364,118 @@ limitations under the License.
</plugins>
</build>
</profile>
<!--
This profile will include neo4j for purposes of transactional testing within Gremlin Server.
Tests that require neo4j specifically will be "ignored" if this profile is not turned on.
-->
<profile>
<id>include-neo4j</id>
<activation>
<activeByDefault>false</activeByDefault>
<property>
<name>includeNeo4j</name>
</property>
</activation>
<properties>
<TEST_TRANSACTIONS>true</TEST_TRANSACTIONS>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.gmavenplus</groupId>
<artifactId>gmavenplus-plugin</artifactId>
<dependencies>
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-tinkerpop-api-impl</artifactId>
<version>0.9-3.4.0</version>
<exclusions>
<exclusion>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-kernel</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</exclusion>
<exclusion>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</exclusion>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
<exclusion>
<groupId>org.ow2.asm</groupId>
<artifactId>asm</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-core</artifactId>
<version>5.5.0</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>4.0.2</version>
</dependency>
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-kernel</artifactId>
<version>3.4.11</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</profile>
<!--
Provides a way to deploy the gremlinpython GLV to pypi. This cannot be part of the standard maven execution
because pypi does not have a staging environment like sonatype for releases. As soon as the release is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
# specific language governing permissions and limitations
# under the License.
#


import aiohttp
import asyncio
import async_timeout
from aiohttp import ClientResponseError
import logging
lyndonbauto marked this conversation as resolved.
Show resolved Hide resolved

from gremlin_python.driver.transport import AbstractBaseTransport

Expand Down Expand Up @@ -58,6 +57,11 @@ def __init__(self, call_from_event_loop=None, read_timeout=None, write_timeout=N
if "ssl_options" in self._aiohttp_kwargs:
self._aiohttp_kwargs["ssl"] = self._aiohttp_kwargs.pop("ssl_options")

def __del__(self):
# Close will only actually close if things are left open, so this is safe to call.
# Clean up any connection resources and close the event loop.
self.close()

def connect(self, url, headers=None):
# Inner function to perform async connect.
async def async_connect():
Expand Down Expand Up @@ -116,10 +120,13 @@ async def async_read():
def close(self):
# Inner function to perform async close.
async def async_close():
if not self._websocket.closed:
if self._websocket is not None and not self._websocket.closed:
await self._websocket.close()
if not self._client_session.closed:
self._websocket = None

if self._client_session is not None and not self._client_session.closed:
await self._client_session.close()
self._client_session = None

# If the loop is not closed (connection hasn't already been closed)
if not self._loop.is_closed():
Expand Down
56 changes: 27 additions & 29 deletions gremlin-python/src/main/python/gremlin_python/driver/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# specific language governing permissions and limitations
# under the License.
#
import logging
from concurrent.futures import ThreadPoolExecutor

from six.moves import queue
Expand All @@ -31,16 +32,17 @@
def cpu_count():
return None

__author__ = 'David M. Brown (davebshow@gmail.com)'
__author__ = 'David M. Brown (davebshow@gmail.com), Lyndon Bauto (lyndonb@bitquilltech.com)'


class Client:

def __init__(self, url, traversal_source, protocol_factory=None,
transport_factory=None, pool_size=None, max_workers=None,
message_serializer=None, username="", password="",
kerberized_service="", headers=None, session="",
kerberized_service="", headers=None, session=None,
lyndonbauto marked this conversation as resolved.
Show resolved Hide resolved
**transport_kwargs):
logging.info("Creating Client with url '%s'", url)
self._url = url
self._headers = headers
self._traversal_source = traversal_source
Expand All @@ -52,7 +54,7 @@ def __init__(self, url, traversal_source, protocol_factory=None,
self._username = username
self._password = password
self._session = session
self._sessionEnabled = (session != "")
self._session_enabled = (session is not None and session != "")
if transport_factory is None:
try:
from gremlin_python.driver.aiohttp.transport import (
Expand All @@ -71,7 +73,7 @@ def protocol_factory(): return protocol.GremlinServerWSProtocol(
password=self._password,
kerberized_service=kerberized_service)
self._protocol_factory = protocol_factory
if self._sessionEnabled:
if self._session_enabled:
if pool_size is None:
pool_size = 1
elif pool_size != 1:
Expand Down Expand Up @@ -107,20 +109,12 @@ def _fill_pool(self):
self._pool.put_nowait(conn)

def close(self):
if self._sessionEnabled:
self._close_session()
logging.info("Closing Client with url '%s'", self._url)
while not self._pool.empty():
conn = self._pool.get(True)
conn.close()
self._executor.shutdown()

def _close_session(self):
lyndonbauto marked this conversation as resolved.
Show resolved Hide resolved
message = request.RequestMessage(
processor='session', op='close',
args={'session': self._session})
conn = self._pool.get(True)
return conn.write(message).result()

def _get_connection(self):
protocol = self._protocol_factory()
return connection.Connection(
Expand All @@ -129,24 +123,28 @@ def _get_connection(self):
headers=self._headers)

def submit(self, message, bindings=None, request_options=None):
return self.submitAsync(message, bindings=bindings, request_options=request_options).result()
return self.submit_async(message, bindings=bindings, request_options=request_options).result()

def submitAsync(self, message, bindings=None, request_options=None):
lyndonbauto marked this conversation as resolved.
Show resolved Hide resolved
def submit_async(self, message, bindings=None, request_options=None):
logging.debug("message '%s'", str(message))
args = {'gremlin': message, 'aliases': {'g': self._traversal_source}}
processor = ''
op = 'eval'
if isinstance(message, traversal.Bytecode):
message = request.RequestMessage(
processor='traversal', op='bytecode',
args={'gremlin': message,
'aliases': {'g': self._traversal_source}})
elif isinstance(message, str):
message = request.RequestMessage(
processor='', op='eval',
args={'gremlin': message,
'aliases': {'g': self._traversal_source}})
if bindings:
message.args.update({'bindings': bindings})
if self._sessionEnabled:
message = message._replace(processor='session')
message.args.update({'session': self._session})
op = 'bytecode'
processor = 'traversal'

if isinstance(message, str) and bindings:
args['bindings'] = bindings

if self._session_enabled:
args['session'] = str(self._session)
processor = 'session'

if isinstance(message, traversal.Bytecode) or isinstance(message, str):
logging.info("processor='%s', op='%s', args='%s'", str(processor), str(op), str(args))
lyndonbauto marked this conversation as resolved.
Show resolved Hide resolved
message = request.RequestMessage(processor=processor, op=op, args=args)

conn = self._pool.get(True)
if request_options:
message.args.update(request_options)
Expand Down
Loading