Session Management Example

Session Wrapping

The default connection pool that is used to execute queries uses a new session for every request. This fully isolates queries from each other, which is a good idea in most cases. There are, however, also scenarios in which multiple queries need to share some kind of state. This can be done with a special kind of connection pool, which by default spawns connections that use the same session.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
from mogwai import connection, properties
from mogwai.models import Vertex, Edge
from mogwai.tools import SessionPoolManager, BlueprintsWrapper, PartitionGraph
from mogwai.exceptions import MogwaiGremlinException

connection.setup('localhost')

##
#  Persist a session with SessionPoolManager
##

k = 10

with SessionPoolManager(bindings={'k': k}):
    gsk = connection.execute_query('"powers of ${k}"')
    pysk = "powers of {}".format(k)
    assert gsk == pysk

    kk = connection.execute_query("k * k")
    assert kk == k * k


##
#  Wrap the graph with a Blueprints Implementation
##

class BlueprintsWrapperVertex(Vertex):
    element_type = 'blueprints_wrapper_vertex'
    name = properties.String(required=True, max_length=128)


class BlueprintsWrapperEdge(Edge):
    element_type = 'blueprints_wrapper_edge'
    name = properties.String(required=True, max_length=128)

v0 = BlueprintsWrapperVertex.create(name="valid")

with BlueprintsWrapper(class_name='ReadOnlyGraph'):
    v0 = BlueprintsWrapperVertex.get(v0._id)
    try:
        BlueprintsWrapperVertex.create(name="illegal")
    except MogwaiGremlinException:
        print "java.lang.UnsupportedOperationException: \
               It is not possible to mutate a ReadOnlyGraph"


##
#  Treat the graph as a Partition Graph
##

with PartitionGraph(write='a'):
    v1 = BlueprintsWrapperVertex.create(name="only in a")
    v3 = BlueprintsWrapperVertex.create(name="started in a")

with PartitionGraph(write='b', read=['a']):
    v2 = BlueprintsWrapperVertex.create(name="only in b")
    e1 = BlueprintsWrapperEdge.create(outV=v2, inV=v1, name="only in b")
    v3.name = "still in a"
    v3.save()

with PartitionGraph(write='a'):
    v1 = BlueprintsWrapperVertex.get(v1._id)
    assert len(v1.bothE()) == 0
    try:
        BlueprintsWrapperVertex.get(v2._id)
    except BlueprintsWrapperVertex.DoesNotExist:
        print "vertex is located in partition B"

# outside of the session all the elements are accessible
print v1.bothE()[0]
# BlueprintsWrapperEdge(label=blueprints_wra..., values={'name': 'only in b'})

print dict(BlueprintsWrapperVertex.get(v3._id))
# {'_partition': 'a', 'name': 'still in a'}

Concurrent Session Wrapping

Note

The pool needs to be referenced explicitly by querying methods in the wrapper when the connection to Rexster is set up with e.g. eventlet or gevent.

The type of concurrency that is available in mogwai and rexpro is based on coroutines, which share the same global context. It would thus be unsafe to keep the session or its associated pool in a global variable. This would cause concurrently executed session wrappers to change the session key for all ongoing requests, also for those that should be executed within a different session.

All graph element methods that perform operations on the graph accept the optional keyword arguments transaction, isolate, and pool, which can override the defaults set by execute_query.

execute_query(query, params={}, transaction=True, isolate=True, pool=None

The following examples use eventlet for concurrent querying:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from mogwai import connection, properties
from mogwai.models import Vertex, Edge
from mogwai.tools import SessionPoolManager, PartitionGraph
import eventlet

connection.setup('localhost', concurrency='eventlet')

##
#  Persist a session with SessionPoolManager
##

def isolation_test(scope):
    wrapper_config = {
        'bindings': {'scope': scope},
        'pool_size': 5
    }
    scope_values = []

    with SessionPoolManager(**wrapper_config) as pool:
        for i in range(7):
            scope_val = connection.execute_query(
                "scope *= 2",
                isolate=False,
                pool=pool
            )
            scope_values.append(scope_val)

    return scope, scope_values

pile = eventlet.GreenPile()
[pile.spawn(isolation_test, i) for i in range(10)]

for scope, scope_values in pile:
    assert scope_values == [scope * 2**i for i in range(1, 8)]


##
#  Treat the graph as a Partition Graph
##

class BlueprintsWrapperVertex(Vertex):
    element_type = 'blueprints_wrapper_vertex'
    name = properties.String(required=True, max_length=128)


class BlueprintsWrapperEdge(Edge):
    element_type = 'blueprints_wrapper_edge'
    name = properties.String(required=True, max_length=128)

n = 5

with PartitionGraph(write='a') as pool:
    av_pile = eventlet.GreenPile()
    for i in range(n):
        av_pile.spawn(BlueprintsWrapperVertex.create, name="only in a", pool=pool)
    vertices_in_a = list(av_pile)

with PartitionGraph(write='b', read=['a']) as pool:
    bv_pile = eventlet.GreenPile()
    for i in range(n):
        bv_pile.spawn(BlueprintsWrapperVertex.create, name="only in b", pool=pool)

    [pile.spawn(isolation_test, i) for i in range(n)]

    be_pile = eventlet.GreenPile()
    for vb in bv_pile:
        va = vertices_in_a.pop()
        scope, scope_values = pile.next()
        va[str(scope)] = scope_values
        av_pile.spawn(va.save, pool=pool)

        be_pile.spawn(BlueprintsWrapperEdge.create, outV=vb, inV=va,
                      name="only in b", pool=pool)

    vertices_in_a = [av for av in av_pile]
    edges_in_b = [be for be in be_pile]

for v in BlueprintsWrapperVertex.all():
    print v._id, dict(v), v.outE()

[pile.spawn(v.delete) for v in BlueprintsWrapperVertex.all()]
list(pile)