How Can Python Observe Changes To Mongodb's Oplog
Solution 1:
I wrote a incremental backup tool for MongoDB some time ago, in Python. The tool monitors data changes by tailing the oplog
. Here is the relevant part of the code.
Updated answer, MongDB 3.6+
As datdinhquoc cleverly points out in the comments below, for MongoDB 3.6 and up there are Change Streams.
Updated answer, pymongo 3
from time import sleep
from pymongo import MongoClient, ASCENDING
from pymongo.cursor import CursorType
from pymongo.errors import AutoReconnect
# Time to wait for data or connection.
_SLEEP = 1.0if __name__ == '__main__':
oplog = MongoClient().local.oplog.rs
stamp = oplog.find().sort('$natural', ASCENDING).limit(-1).next()['ts']
whileTrue:
kw = {}
kw['filter'] = {'ts': {'$gt': stamp}}
kw['cursor_type'] = CursorType.TAILABLE_AWAIT
kw['oplog_replay'] = True
cursor = oplog.find(**kw)
try:
while cursor.alive:
for doc in cursor:
stamp = doc['ts']
print(doc) # Do something with doc.
sleep(_SLEEP)
except AutoReconnect:
sleep(_SLEEP)
Also see http://api.mongodb.com/python/current/examples/tailable.html.
Original answer, pymongo 2
from time import sleep
from pymongo import MongoClient
from pymongo.cursor import _QUERY_OPTIONS
from pymongo.errors import AutoReconnect
from bson.timestamp import Timestamp
# Tailable cursor options.
_TAIL_OPTS = {'tailable': True, 'await_data': True}
# Time to wait for data or connection.
_SLEEP = 10if __name__ == '__main__':
db = MongoClient().local
whileTrue:
query = {'ts': {'$gt': Timestamp(some_timestamp, 0)}} # Replace with your query.
cursor = db.oplog.rs.find(query, **_TAIL_OPTS)
cursor.add_option(_QUERY_OPTIONS['oplog_replay'])
try:
while cursor.alive:
try:
doc = next(cursor)
# Do something with doc.except (AutoReconnect, StopIteration):
sleep(_SLEEP)
finally:
cursor.close()
Solution 2:
I ran into this issue today and haven't found an updated answer anywhere.
The Cursor class has changed as of v3.0 and no longer accepts the tailable
and await_data
arguments. This example will tail the oplog and print the oplog record when it finds a record newer than the last one it found.
# Adapted from the example here: https://jira.mongodb.org/browse/PYTHON-735# to work with pymongo 3.0import pymongo
from pymongo.cursor import CursorType
c = pymongo.MongoClient()
# Uncomment this for master/slave.
oplog = c.local.oplog['$main']
# Uncomment this for replica sets.#oplog = c.local.oplog.rs
first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
ts = first['ts']
whileTrue:
cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
while cursor.alive:
for doc in cursor:
ts = doc['ts']
print doc
# Work with doc here
Solution 3:
Query the oplog with a tailable cursor.
It is actually funny, because oplog-monitoring is exactly what the tailable-cursor feature was added for originally. I find it extremely useful for other things as well (e.g. implementing a mongodb-based pubsub, see this post for example), but that was the original purpose.
Solution 4:
I had the same issue. I put together this rescommunes/oplog.py. Check comments and see __main__
for an example of how you could use it with your script.
Post a Comment for "How Can Python Observe Changes To Mongodb's Oplog"