Skip to content Skip to sidebar Skip to footer

How Can Python Observe Changes To Mongodb's Oplog

I have multiple Python scripts writing to Mongodb using pyMongo. How can another Python script observe changes to a Mongo query and perform some function when the change occurs? mo

Solution 1:

I wrote a incremental backup tool for MongoDB some time ago, in Python. The tool monitors data changes by tailing the oplog. Here is the relevant part of the code.

Updated answer, MongDB 3.6+

As datdinhquoc cleverly points out in the comments below, for MongoDB 3.6 and up there are Change Streams.

Updated answer, pymongo 3

from time import sleep

from pymongo import MongoClient, ASCENDING
from pymongo.cursor import CursorType
from pymongo.errors import AutoReconnect

# Time to wait for data or connection.
_SLEEP = 1.0if __name__ == '__main__':
    oplog = MongoClient().local.oplog.rs
    stamp = oplog.find().sort('$natural', ASCENDING).limit(-1).next()['ts']

    whileTrue:
        kw = {}

        kw['filter'] = {'ts': {'$gt': stamp}}
        kw['cursor_type'] = CursorType.TAILABLE_AWAIT
        kw['oplog_replay'] = True

        cursor = oplog.find(**kw)

        try:
            while cursor.alive:
                for doc in cursor:
                    stamp = doc['ts']

                    print(doc)  # Do something with doc.

                sleep(_SLEEP)

        except AutoReconnect:
            sleep(_SLEEP)

Also see http://api.mongodb.com/python/current/examples/tailable.html.

Original answer, pymongo 2

from time import sleep

from pymongo import MongoClient
from pymongo.cursor import _QUERY_OPTIONS
from pymongo.errors import AutoReconnect
from bson.timestamp import Timestamp

# Tailable cursor options.
_TAIL_OPTS = {'tailable': True, 'await_data': True}

# Time to wait for data or connection.
_SLEEP = 10if __name__ == '__main__':
    db = MongoClient().local

    whileTrue:
        query = {'ts': {'$gt': Timestamp(some_timestamp, 0)}}  # Replace with your query.
        cursor = db.oplog.rs.find(query, **_TAIL_OPTS)

        cursor.add_option(_QUERY_OPTIONS['oplog_replay'])

        try:
            while cursor.alive:
                try:
                    doc = next(cursor)

                    # Do something with doc.except (AutoReconnect, StopIteration):
                    sleep(_SLEEP)

        finally:
            cursor.close()

Solution 2:

I ran into this issue today and haven't found an updated answer anywhere.

The Cursor class has changed as of v3.0 and no longer accepts the tailable and await_data arguments. This example will tail the oplog and print the oplog record when it finds a record newer than the last one it found.

# Adapted from the example here: https://jira.mongodb.org/browse/PYTHON-735# to work with pymongo 3.0import pymongo
from pymongo.cursor import CursorType

c = pymongo.MongoClient()

# Uncomment this for master/slave.
oplog = c.local.oplog['$main']
# Uncomment this for replica sets.#oplog = c.local.oplog.rs

first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
ts = first['ts']

whileTrue:
    cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
    while cursor.alive:
        for doc in cursor:
            ts = doc['ts']
            print doc
            # Work with doc here

Solution 3:

Query the oplog with a tailable cursor.

It is actually funny, because oplog-monitoring is exactly what the tailable-cursor feature was added for originally. I find it extremely useful for other things as well (e.g. implementing a mongodb-based pubsub, see this post for example), but that was the original purpose.

Solution 4:

I had the same issue. I put together this rescommunes/oplog.py. Check comments and see __main__ for an example of how you could use it with your script.

Post a Comment for "How Can Python Observe Changes To Mongodb's Oplog"