把MongoDB的业务数据采集到Elasticsearch

背景

有这样一个业务场景, 玩家可以在游戏里面建立各种房间, 房间里面可以运行各种各样的游戏. 其中, 房间的数据就是用MongoDB记录, 每一条文档对应的是一个房间. 为了观察房间状态的变化以及统计玩家的用户行为, 需要对MongoDB的数据进行收集和统计.

本来打算简单用个Grafana设置MongoDB做数据源, 再弄个dashboard就完事了, 奈何Grafana需要商用证书才可以使用MongoDB的插件, 最后就计划用ES.

搜了一圈开源社区, 也没有找到可以订阅MongoDB再输出到ES的中间件, 最后只好自己写了.

环境信息:

  • Python3.9.6

  • MongoDB 4.0.3-cmongo-Community

  • ElasticSearch 7.10.1

可行性验证

需求很简单, 就是要定期查询MongoDB中一部分特定文档, 插入到ES的特定索引中.

计划使用Python, 并通过交互式运行. 需要完成对MongoDB的查询以及到ES的入库过程.

MongoDB客户端

使用pymongo 4.3.3版本, 关于连接MongoDB方法, 查看文档如下:

doc_pymongo.png

在本例中, 连接MongoDB的代码如下:

1
2
3
4
5
6
7
>>> import pymongo
>>> dburl="mongodb://mongouser:xxxxxx@10.3.2.26:27017,10.3.2.4:27017,10.3.2.24:27017/?replicaSet=aaa111"
>>> myclient=pymongo.MongoClient(dburl)
>>> myclient
MongoClient(host=['10.3.2.26:27017', '10.3.2.24:27017', '10.3.2.4:27017'], document_class=dict, tz_aware=False, connect=True, replicaset='aaa111')
>>> type(myclient)
<class 'pymongo.mongo_client.MongoClient'>

初始化了一个实例myclient, 用于连接MongoDB, 后面我们的所有MongoDB相关操作, 都是通过这个实例进行.

MongoClient这个类, 在site-packages/pymongo/mongo_client.py中定义.

也可以在python的交互式界面通过help()或者dir()功能去查看文档, 但笔者一般习惯直接看线上文档.

数据查询

在MongoClient类中有__getitem__方法, 可以返回所有的Database:

1
2
3
4
5
6
7
8
9
10
11
12
13
class MongoClient(common.BaseObject, Generic[_DocumentType]):
...省略...
def __getitem__(self, name: str) -> database.Database[_DocumentType]:
"""Get a database by name.

Raises :class:`~pymongo.errors.InvalidName` if an invalid
database name is used.

:Parameters:
- `name`: the name of the database to get
"""
return database.Database(self, name)
...省略...

接着, 在文件database.py中约定的Database类中, 也有__getitem__方法, 可以返回所有的Collection:

1
2
3
4
5
6
7
8
9
10
11
12
13
class Database(common.BaseObject, Generic[_DocumentType]):
"""A Mongo database."""
...省略...
def __getitem__(self, name: str) -> "Collection[_DocumentType]":
"""Get a collection of this database by name.

Raises InvalidName if an invalid collection name is used.

:Parameters:
- `name`: the name of the collection to get
"""
return Collection(self, name)
...省略...

又可以继续深挖, 在collection.py定义的Collection中, 有find方法, 可以开始查询:

1
2
3
4
5
class Collection(common.BaseObject, Generic[_DocumentType]):
"""A Mongo collection."""
...省略...
def find(self, *args: Any, **kwargs: Any) -> Cursor[_DocumentType]:
return Cursor(self, *args, **kwargs)

最后, 一通摸索下来, 要搜索ZoneService库里面的zones集合, 我们就可以这样进行查询:

1
2
3
4
>>> result = myclient['ZoneService']['zones'].find({})
>>> result[0]
{'_id': ObjectId('649c11644226789ff4f682ae'), 'groupId': '200', 'projectType': 4, 'version': 'this is a secret', 'role': '', 'platformBitTypesForNavigation': 0, 'platformBitTypesForMatching': 7, 'serverGuid': '01002405-76BD-4BFC-BCED-8DE4C8541AFA', 'zoneGuid': 'C6385685-74E9-4673-A696-F0EDF7040296', 'zoneExtendType': 4, 'zoneSearchKey': '3901862586:1', 'zoneSearchUniqueKey': 'NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467', 'zoneId': 0, 'ugcId': '3901862586', 'ugcVersionId': '1', 'ugcTitle': '测试数据', 'maxPlayerCount': 2, 'reservedCount': 0, 'connectionCount': 0, 'activeCount': 0, 'channelNo': 0, 'enterLimit': False, 'enterOwner': False, 'ownerMemberNo': 0, 'reservedUsers': [], 'enteredUsers': [], 'isCheckValid': True, 'createAt': datetime.datetime(2023, 6, 28, 10, 54, 28, 693000), 'lastTryEnterableCheckingAt': datetime.datetime(2023, 6, 27, 10, 54, 28, 693000)}
>>>

可以看到, 我们可以通过下标直接查看查询结果, 也可以使用for进行遍历. MongoDB中的文档, 以字典的形式呈现出来.

连接ES

pymongo不一样, elasticsearch库的代码结构又是另一个风格.

首先我们要在elasticsearch/client/__init__.py里面看到客户端类Elasticsearch, 在初始化的时候, 会通过引用其他对象, 实现对ES的REST调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Elasticsearch(object):
def __init__(self, hosts=None, transport_class=Transport, **kwargs):
"""
:arg hosts: list of nodes, or a single node, we should connect to.
Node should be a dictionary ({"host": "localhost", "port": 9200}),
the entire dictionary will be passed to the :class:`~elasticsearch.Connection`
class as kwargs, or a string in the format of ``host[:port]`` which will be
translated to a dictionary automatically. If no value is given the
:class:`~elasticsearch.Connection` class defaults will be used.

:arg transport_class: :class:`~elasticsearch.Transport` subclass to use.

:arg kwargs: any additional arguments will be passed on to the
:class:`~elasticsearch.Transport` class and, subsequently, to the
:class:`~elasticsearch.Connection` instances.
"""
self.transport = transport_class(_normalize_hosts(hosts), **kwargs)

# namespaced clients for compatibility with API names
self.async_search = AsyncSearchClient(self)
self.autoscaling = AutoscalingClient(self)
self.cat = CatClient(self)
self.cluster = ClusterClient(self)
self.dangling_indices = DanglingIndicesClient(self)
self.indices = IndicesClient(self)
self.ingest = IngestClient(self)
self.nodes = NodesClient(self)
self.remote = RemoteClient(self)
self.snapshot = SnapshotClient(self)
self.tasks = TasksClient(self)

self.xpack = XPackClient(self)
self.ccr = CcrClient(self)
self.data_frame = Data_FrameClient(self)
self.deprecation = DeprecationClient(self)
self.enrich = EnrichClient(self)
self.eql = EqlClient(self)
self.graph = GraphClient(self)
self.ilm = IlmClient(self)
self.indices = IndicesClient(self)
self.license = LicenseClient(self)
self.migration = MigrationClient(self)
self.ml = MlClient(self)
self.monitoring = MonitoringClient(self)
self.rollup = RollupClient(self)
self.searchable_snapshots = SearchableSnapshotsClient(self)
self.security = SecurityClient(self)
self.slm = SlmClient(self)
self.sql = SqlClient(self)
self.ssl = SslClient(self)

根据文档, 在python中连接ES, 方法如下:

1
2
3
4
5
>>> from elasticsearch import Elasticsearch  
>>> es_url="http://elastic:******@10.3.2.37:9200/"
>>> es = Elasticsearch(host=es_url)
>>> type(es)
<class 'elasticsearch.client.Elasticsearch'>

Elasticsearch类中, 还定义了info()方法, 查看连接信息:

1
2
3
4
5
6
7
8
9
def info(self, params=None, headers=None):
"""
Returns basic information about the cluster.

`<https://www.elastic.co/guide/en/elasticsearch/reference/7.10/index.html>`_
"""
return self.transport.perform_request(
"GET", "/", params=params, headers=headers
)

从代码看, 就是请求了ES服务的/, 正常情况下, 会返回ES的一些集群信息. 测试一下:

1
2
>>> es.info()
{'name': '1642044271000021132', 'cluster_name': 'es-di9ad7tb', 'cluster_uuid': 'yZbbvEbAQvCuF9tZDilvMA', 'version': {'number': '7.10.1', 'build_flavor': 'default', 'build_type': 'tar', 'build_hash': '187b1511f798e4d23625ffa92ccf5c44840e2650', 'build_date': '2021-12-22T12:45:12.223537200Z', 'build_snapshot': False, 'lucene_version': '8.7.0', 'minimum_wire_compatibility_version': '6.8.0', 'minimum_index_compatibility_version': '6.0.0-beta1'}, 'tagline': 'You Know, for Search'}

向ES插入数据

使用Elasticsearch类中的index方法, 通过POST的方式, 把文档插入到ES的索引中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def index(self, index, body, doc_type=None, id=None, params=None, headers=None):
for param in (index, body):
if param in SKIP_IN_PATH:
raise ValueError("Empty value passed for a required argument.")

if doc_type is None:
doc_type = "_doc"

return self.transport.perform_request(
"POST" if id in SKIP_IN_PATH else "PUT",
_make_path(index, doc_type, id),
params=params,
headers=headers,
body=body,
)

把刚才从MongoDB中查询到的数据, 发送到ES的索引看看:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
>>> type(result[0])
<class 'dict'>
>>> es.index(index='rondo-temp-test', body=result[0])
Traceback (most recent call last):
File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/serializer.py", line 130, in dumps
return json.dumps(
File "/usr/local/python3/lib/python3.9/json/__init__.py", line 234, in dumps
return cls(
File "/usr/local/python3/lib/python3.9/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/python3/lib/python3.9/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/serializer.py", line 116, in default
raise TypeError("Unable to serialize %r (type: %s)" % (data, type(data)))
TypeError: Unable to serialize ObjectId('649c11644226789ff4f682ae') (type: <class 'bson.objectid.ObjectId'>)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/client/utils.py", line 152, in _wrapped
return func(*args, params=params, headers=headers, **kwargs)
File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/client/__init__.py", line 398, in index
return self.transport.perform_request(
File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/transport.py", line 350, in perform_request
method, params, body, ignore, timeout = self._resolve_request_args(
File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/transport.py", line 416, in _resolve_request_args
body = self.serializer.dumps(body)
File "/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/serializer.py", line 134, in dumps
raise SerializationError(data, e)
elasticsearch.exceptions.SerializationError: ({'_id': ObjectId('649c11644226789ff4f682ae'), 'groupId': '200', 'projectType': 4, 'version': 'this is a secret', 'role': '', 'platformBitTypesForNavigation': 0, 'platformBitTypesForMatching': 7, 'serverGuid': '01002405-76BD-4BFC-BCED-8DE4C8541AFA', 'zoneGuid': 'C6385685-74E9-4673-A696-F0EDF7040296', 'zoneExtendType': 4, 'zoneSearchKey': '3901862586:1', 'zoneSearchUniqueKey': 'NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467', 'zoneId': 0, 'ugcId': '3901862586', 'ugcVersionId': '1', 'ugcTitle': '测试数据', 'maxPlayerCount': 2, 'reservedCount': 0, 'connectionCount': 0, 'activeCount': 0, 'channelNo': 0, 'enterLimit': False, 'enterOwner': False, 'ownerMemberNo': 0, 'reservedUsers': [], 'enteredUsers': [], 'isCheckValid': True, 'createAt': datetime.datetime(2023, 6, 28, 10, 54, 28, 693000), 'lastTryEnterableCheckingAt': datetime.datetime(2023, 6, 27, 10, 54, 28, 693000)}, TypeError("Unable to serialize ObjectId('649c11644226789ff4f682ae') (type: <class 'bson.objectid.ObjectId'>)"))
>>>

出现报错了, 报错内容是: TypeError("Unable to serialize ObjectId('649c11644226789ff4f682ae').

我们可以看到, 在result[0]中的这条数据, 包含了一个MongoDB的类:

1
2
3
4
>>> result[0]['_id']
ObjectId('649c11644226789ff4f682ae')
>>> type(result[0]['_id'])
<class 'bson.objectid.ObjectId'>

ES无法处理这种BSON类, 本来是想着删除就完事的, 但是考虑到可以以文本的形式保留下来, 后面在排查业务问题的时候, 也可以通过这个_id字段定位到MongoDB的原始数据.

这次在pymongo部分使用find_one方法, 只查一条数据作为示例, 解决方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
>>> result = myclient['ZoneService']['zones'].find_one({})
>>> type(result)
<class 'dict'>
>>> result
{'_id': ObjectId('649c11644226789ff4f682ae'), 'groupId': '200', 'projectType': 4, 'version': 'this is a secret', 'role': '', 'platformBitTypesForNavigation': 0, 'platformBitTypesForMatching': 7, 'serverGuid': '01002405-76BD-4BFC-BCED-8DE4C8541AFA', 'zoneGuid': 'C6385685-74E9-4673-A696-F0EDF7040296', 'zoneExtendType': 4, 'zoneSearchKey': '3901862586:1', 'zoneSearchUniqueKey': 'NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467', 'zoneId': 0, 'ugcId': '3901862586', 'ugcVersionId': '1', 'ugcTitle': '测试数据', 'maxPlayerCount': 2, 'reservedCount': 0, 'connectionCount': 0, 'activeCount': 0, 'channelNo': 0, 'enterLimit': False, 'enterOwner': False, 'ownerMemberNo': 0, 'reservedUsers': [], 'enteredUsers': [], 'isCheckValid': True, 'createAt': datetime.datetime(2023, 6, 28, 10, 54, 28, 693000), 'lastTryEnterableCheckingAt': datetime.datetime(2023, 6, 27, 10, 54, 28, 693000)}
>>> result['mongo_id']
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
KeyError: 'mongo_id'
>>> result['mongo_id'] = str(result['_id'])
>>> result['mongo_id']
'649c11644226789ff4f682ae'
>>> type(result['mongo_id'])
<class 'str'>
>>> del(result['_id'])
>>> es.index(index='rondo-temp-test', body=result)
/root/rondo/zones-monitor/lib/python3.9/site-packages/elasticsearch/connection/base.py:190: ElasticsearchDeprecationWarning: index [rondo-temp-test] matches multiple legacy templates [default@template, scene@template], composable templates will only match a single template
warnings.warn(message, category=ElasticsearchDeprecationWarning)
{'_index': 'rondo-temp-test', '_type': '_doc', '_id': '0E3xZ4kBTkcA723hKye8', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}
>>>

成功!

整合

确定了可行性之后, 接下来就得梳理一下业务需求了:

  • 定期查询MongoDB中特定的集合, 把查询结果的所有文档保存到ES

  • 使用Python运行, 在Python中通过无限循环实现数据监控

  • 通过环境变量读取关键配置信息, 数据查询频率可调

  • 通过stdout输出关键信息

  • 考虑跨时区问题

配置参数

结合需求分析, 不难看出, 起码有四个可配置项, 分别设置成环境变量:

  • MongoDB数据库连接串: mongodb_url

  • ES数据库连接串: es_url

  • ES数据库索引名称: zone_monitor_index_name

  • 检查间隔: check_interval

在Python代码中, 我们可以通过os库获取环境变量:

1
2
3
>>> import os
>>> os.getenv('HOME')
'/root'

mongodb的timezone

眼尖的笔者还发现, 在上面的摸索过程中看到, MongoDB的数据中有部分datetime数据, 但是没有带时间戳.

当保存到ES之后, 在ES中记录如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
GET /rondo-temp-test/_doc/0E3xZ4kBTkcA723hKye8

{
"_index" : "rondo-temp-test",
"_type" : "_doc",
"_id" : "0E3xZ4kBTkcA723hKye8",
"_version" : 1,
"_seq_no" : 0,
"_primary_term" : 1,
"found" : true,
"_source" : {
"groupId" : "200",
"projectType" : 4,
"version" : "this is a secret",
"role" : "",
"platformBitTypesForNavigation" : 0,
"platformBitTypesForMatching" : 7,
"serverGuid" : "01002405-76BD-4BFC-BCED-8DE4C8541AFA",
"zoneGuid" : "C6385685-74E9-4673-A696-F0EDF7040296",
"zoneExtendType" : 4,
"zoneSearchKey" : "3901862586:1",
"zoneSearchUniqueKey" : "NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467",
"zoneId" : 0,
"ugcId" : "3901862586",
"ugcVersionId" : "1",
"ugcTitle" : "测试数据",
"maxPlayerCount" : 2,
"reservedCount" : 0,
"connectionCount" : 0,
"activeCount" : 0,
"channelNo" : 0,
"enterLimit" : false,
"enterOwner" : false,
"ownerMemberNo" : 0,
"reservedUsers" : [ ],
"enteredUsers" : [ ],
"isCheckValid" : true,
"createAt" : "2023-06-28T10:54:28.693000",
"lastTryEnterableCheckingAt" : "2023-06-27T10:54:28.693000",
"mongo_id" : "649c11644226789ff4f682ae"
}
}

可以看到, createAtlastTryEnterableCheckingAt这两个字段里面的时间戳, 都没有携带时区信息.

也可能是笔者接触过太多次涉及到跨时区的沟通, 也厌倦了跟一些地理知识欠缺的同事(主要是海外的)去解释这些问题, 每次设计类似的方案的时候, 都会习惯性考虑到这个问题. 解决思路也很简单, 所有datetime数据都带上timezone就可以了. 入库的时候, 不管是使用的北京时间还是伦敦时间, 都没关系, 关键是要在时间戳上面明确一下时区, 剩下的转换以及展示问题, 就交给程序.

查看了一下pymongo的关于datetime的文档我们可以知道, 默认情况下, MongoDB都是使用UTC时间存储datetime数据, 但不会显示时区. 如果想要显示时区, 则需要在客户端配置tz_aware.

又再看了一下具体的配置方法, 得知需要通过bson的库配置CodecOptions

最后综合文档里面的示例代码, 笔者测试了一下:

1
2
3
4
>>> from bson.codec_options import CodecOptions
>>> result2 = myclient['ZoneService']['zones'].with_options(codec_options=CodecOptions(tz_aware=True)).find_one({})
>>> result2
{'_id': ObjectId('649c11644226789ff4f682ae'), 'groupId': '200', 'projectType': 4, 'version': 'this is a secret', 'role': '', 'platformBitTypesForNavigation': 0, 'platformBitTypesForMatching': 7, 'serverGuid': '01002405-76BD-4BFC-BCED-8DE4C8541AFA', 'zoneGuid': 'C6385685-74E9-4673-A696-F0EDF7040296', 'zoneExtendType': 4, 'zoneSearchKey': '3901862586:1', 'zoneSearchUniqueKey': 'NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467', 'zoneId': 0, 'ugcId': '3901862586', 'ugcVersionId': '1', 'ugcTitle': '测试数据', 'maxPlayerCount': 2, 'reservedCount': 0, 'connectionCount': 0, 'activeCount': 0, 'channelNo': 0, 'enterLimit': False, 'enterOwner': False, 'ownerMemberNo': 0, 'reservedUsers': [], 'enteredUsers': [], 'isCheckValid': True, 'createAt': datetime.datetime(2023, 6, 28, 10, 54, 28, 693000, tzinfo=<bson.tz_util.FixedOffset object at 0x7f5a1ea4c820>), 'lastTryEnterableCheckingAt': datetime.datetime(2023, 6, 27, 10, 54, 28, 693000, tzinfo=<bson.tz_util.FixedOffset object at 0x7f5a1ea4c820>)}

好了, 现在在MongoDB的查询结果里面, datatime数据会有带上时区数据了. 然后笔者突然又想起来了, 好像是在初始化MongoClient的时候就有一个tz_aware的配置项, 看了一眼文档, 再试一下:

1
2
3
4
5
6
>>> myclient=pymongo.MongoClient(host=dburl,tz_aware=True)
>>> myclient
MongoClient(host=['10.3.2.26:27017', '10.3.2.24:27017', '10.3.2.4:27017'], document_class=dict, tz_aware=True, connect=True, replicaset='aaa111')
>>> result3 = myclient['ZoneService']['zones'].find_one({})
>>> result3
{'_id': ObjectId('649c11644226789ff4f682ae'), 'groupId': '200', 'projectType': 4, 'version': 'this is a secret', 'role': '', 'platformBitTypesForNavigation': 0, 'platformBitTypesForMatching': 7, 'serverGuid': '01002405-76BD-4BFC-BCED-8DE4C8541AFA', 'zoneGuid': 'C6385685-74E9-4673-A696-F0EDF7040296', 'zoneExtendType': 4, 'zoneSearchKey': '3901862586:1', 'zoneSearchUniqueKey': 'NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467', 'zoneId': 0, 'ugcId': '3901862586', 'ugcVersionId': '1', 'ugcTitle': '测试数据', 'maxPlayerCount': 2, 'reservedCount': 0, 'connectionCount': 0, 'activeCount': 0, 'channelNo': 0, 'enterLimit': False, 'enterOwner': False, 'ownerMemberNo': 0, 'reservedUsers': [], 'enteredUsers': [], 'isCheckValid': True, 'createAt': datetime.datetime(2023, 6, 28, 10, 54, 28, 693000, tzinfo=<bson.tz_util.FixedOffset object at 0x7f5a1ea4c820>), 'lastTryEnterableCheckingAt': datetime.datetime(2023, 6, 27, 10, 54, 28, 693000, tzinfo=<bson.tz_util.FixedOffset object at 0x7f5a1ea4c820>)}

似乎都可以, 但是直接在MongoClient中设置, 代码可以精简一点. 也都可以入库ES:

1
2
3
4
5
6
7
8
9
>>> result2['mongo_id'] = str(result2['_id'])      
>>> del result2['_id']
>>> es.index(index='rondo-temp-test', body=result2)
{'_index': 'rondo-temp-test', '_type': '_doc', '_id': 'EFAsaIkBTkcA723hY6eu', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}
>>>
>>> result3['mongo_id'] = str(result3['_id'])
>>> del result3['_id']
>>> es.index(index='rondo-temp-test', body=result3)
{'_index': 'rondo-temp-test', '_type': '_doc', '_id': 'dFWOaIkBTkcA723hzm-1', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 5, '_primary_term': 1}

在ES中查询数据, 可以看到, 也能显示时区信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
GET /rondo-temp-test/_doc/EFAsaIkBTkcA723hY6eu
{
"_index" : "rondo-temp-test",
"_type" : "_doc",
"_id" : "EFAsaIkBTkcA723hY6eu",
"_version" : 1,
"_seq_no" : 1,
"_primary_term" : 1,
"found" : true,
"_source" : {
"groupId" : "200",
"projectType" : 4,
"version" : "this is a secret",
"role" : "",
"platformBitTypesForNavigation" : 0,
"platformBitTypesForMatching" : 7,
"serverGuid" : "01002405-76BD-4BFC-BCED-8DE4C8541AFA",
"zoneGuid" : "C6385685-74E9-4673-A696-F0EDF7040296",
"zoneExtendType" : 4,
"zoneSearchKey" : "3901862586:1",
"zoneSearchUniqueKey" : "NotUse:38ec66bd-2e6f-4c2f-a0e4-c004879ef467",
"zoneId" : 0,
"ugcId" : "3901862586",
"ugcVersionId" : "1",
"ugcTitle" : "测试数据",
"maxPlayerCount" : 2,
"reservedCount" : 0,
"connectionCount" : 0,
"activeCount" : 0,
"channelNo" : 0,
"enterLimit" : false,
"enterOwner" : false,
"ownerMemberNo" : 0,
"reservedUsers" : [ ],
"enteredUsers" : [ ],
"isCheckValid" : true,
"createAt" : "2023-06-28T10:54:28.693000+00:00",
"lastTryEnterableCheckingAt" : "2023-06-27T10:54:28.693000+00:00",
"mongo_id" : "649c11644226789ff4f682ae"
}
}


梳理Main函数

那接下来, 就可以整合出一个完整的python脚本了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# -*- coding: utf-8 -*-
'''
用于定时监控MongoDB中ZoneService.zones的变化, 并收录到ES

by rondochen
'''

from pymongo import MongoClient
from elasticsearch import Elasticsearch
from time import sleep
import os
import datetime
import pytz

def main():
# 连接MongoDB
myclient = MongoClient(host=os.getenv('mongodb_url'), tz_aware=True)

# 连接ES
es = Elasticsearch(hosts=os.getenv('es_url'))

while True:
# 记录一个collect_date(采集时间), 便于监控
collect_date = datetime.datetime.utcnow().replace(tzinfo=pytz.UTC)
print(collect_date)
try:
zones = myclient['ZoneService']['zones'].find({})
except:
print('some thing wrong about mongodb query')

for zone in zones:
zone['mongo_id'] = str(zone['_id'])
del zone['_id']
zone['timestamp'] = collect_date
try:
print(es.index(index=os.getenv('zone_monitor_index_name'),body=zone))
print('----')
except:
print('something wrong in es')
sleep(int(os.getenv('check_interval') or 30))

if __name__ == '__main__':
# 检查环境变量
if os.getenv('es_url') and os.getenv('mongodb_url')and os.getenv('zone_monitor_index_name'):
main()
else:
print('require environment variable $es_url and $mongodb_url $zone_monitor_index_name')
exit()

在运行之前, 使用shell的export命令, 把环境变量设置好, 就可以启动了, 启动起来, 会有这样的输出:

1
2
3
4
5
6
7
8
9
10
# python temp.py
2023-07-18 10:41:24.089384+00:00
{'_index': 'rondo-temp-test', '_type': '_doc', '_id': 'w1WXaIkBTkcA723hKMyT', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 9, '_primary_term': 1}
----
2023-07-18 10:41:54.150495+00:00
{'_index': 'rondo-temp-test', '_type': '_doc', '_id': 'xVWXaIkBTkcA723hndLq', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 10, '_primary_term': 1}
----
2023-07-18 10:42:24.187752+00:00
{'_index': 'rondo-temp-test', '_type': '_doc', '_id': '1VWYaIkBTkcA723hE9g_', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 2, 'failed': 0}, '_seq_no': 11, '_primary_term': 1}
----

在ES中检查相应的数据, 即可完成验收.

ES优化

写到这里, 可能已经有人忘了, 最初要采集MongoDB中的数据, 就是为了可视化用的. 为了能更好地满足可视化或者是业务跟踪的功能, MongoDB的数据在入库ES的时候, 是很有必要进行一番优化的. 否则, 我们可以预见一些字段类型不统一, 或者是ES查询上的性能/容量问题.

根据笔者的经验, 在规划ES的时候, 主要考虑的是:

  • shard/replica数量, 一般是ES集群有多少个节点就设置多少个shard, 然后replica设置成1.

  • 压缩, 只要系统性能没有太紧张, 都会设置best_compression, 节约存储空间.

  • 根据业务场景设置合适的刷新频率

  • 索引生命周期(ILM)

  • 字段类型设置(mapping), 结合实际业务场景考虑

Component Template

在笔者的这个例子中, 做的简单配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"template": {
"settings": {
"index": {
"codec": "best_compression",
"mapping": {
"total_fields": {
"limit": "2000"
}
},
"refresh_interval": "30s",
"number_of_shards": "3",
"max_docvalue_fields_search": "200"
}
},
"mappings": {...省略...}
},
"version": 20230705
}

ILM

在开发环境中, 一般会设置得比较随意:

  • 索引大小超过5GB或者索引创建7天后进行rollover

  • rollover60天后删除

如果是生产环境, 会再另外考虑

Index Template

笔者的经验是: 每个环境创建一个带有环境标识的Index Template, 引用同一个Component Template用于规范字段, 再按需引用ILM. 而在引用ILM的时候, 每个环境设置不同的rollover_alias, 后面带上环境标识.

创建初始索引

因为我们在上面做了ILM的设置, 为了能让索引能正确地rollover, 我们还需要创建一个初始化索引, 类似这样:

1
2
3
4
5
6
PUT zone-monitor-dev-000001
{
"aliases": {
"zone-monitor-dev": { }
}
}

这样, 我们的脚本可以向zone-watcher-dev输入数据, 而ES又可以根据索引的生命周期设置, 自动地触发rollover, 创建后续的zone-monitor-dev-000002, zone-monitor-dev-000003等索引.

使用docker部署

脚本开发完了, 也验证可以运行了, 甚至也开始在kibana做一些可视化设置了. 就开始考虑在各个环境部署了, 为了方便, 我们可以很直观地考虑到, 使用docker去部署.

在开始编写dockerfile之前, 先按照主流的做法, 把脚本里面用到的库导出来, 记录在requirements.txt.

可以使用pipreqs直接导出项目所需的库:

1
2
3
4
5
6
7
(zones-monitor) [root@VM-111-16-centos tmp]# pip install pipreqs
(zones-monitor) [root@VM-111-16-centos tmp]# pipreqs .
INFO: Successfully saved requirements file in ./requirements.txt
(zones-monitor) [root@VM-111-16-centos tmp]# cat requirements.txt
elasticsearch==7.10.0
pymongo==4.3.3
pytz==2023.3

dockerfile

对于这种简单的运维脚本, 几行搞定:

1
2
3
4
5
6
7
# syntax=docker/dockerfile:1
from python:3.9-alpine
WORKDIR /app
COPY requirements.txt requirements.txt
RUN pip3 install -r requirements.txt
COPY zone-monitor.py zone-monitor.py
CMD ["python3", "zone-monitor.py"]

构建命令:

1
docker build -t zone-monitor:1.0 -f zone-monitor.dockerfile .

通过外部文件记录正确的配置内容, 直接启动:

1
2
3
4
5
6
7
# cat dev.env 
es_url=http://elastic:******@10.3.2.37:9200/
mongodb_url=mongodb://mongouser:xxxxxx@10.3.2.26:27017,10.3.2.4:27017,10.3.2.24:27017/?replicaSet=aaa111
zone_monitor_index_name=zone-monitor-dev
check_interval=60

docker run -d --env-file ./dev.env --name zone-monitor-dev zone-monitor:1.0

docker compose

本来做到这里就差不多了, 但在笔者真正工作的项目中, 这个脚本还衍生出了很多雷同的, 监控不同的MongoDB数据库里面的内容, 每个环境都有好几个脚本要启动, 所以就会用到docker compose. 这样, 在给各个环境部署的时候, 只需要拿到docker compose的yaml文件, 再写好正确的环境变量, 就可以很规范地完成启动.

1
2
3
4
5
6
7
8
9
10
services:
zone-monitor-dev:
build:
context: .
dockerfile: zone-monitor.dockerfile
image: zone-monitor:1.0
env_file:
- dev.env
container_name: zone-monitor-dev
...省略下面的多个service...

启动命令:

1
docker compose -f docker-compose-dev.yml up -d

后记

通过一次方案设计, 聊了一下笔者的一些工作上的片段. 总的来说, 就是围绕着把MongoDB中的数据收录到ES中这个需求, 最后衍生出来的一系列工作, 包括:

  • 明确需求

  • 调研可行性

  • 编写代码

  • 反复打磨

  • 部署

  • 优化

这类型的项目, 在后续的维护中, 往往还需要关注随着版本迭代而引起的MongoDB数据变化, ES的存储空间/性能, 以及可视化设置等.

笔者一直认为, 技术都是为具体业务而服务的, 就像本例中这种简单的业务场景, 就没有非要凹造型弄出一些面向对象的复杂代码. 最后进行容器化改造, 也只是为了应付多环境部署这个场景.

本文中涉及的业务场景乃至代码, 对其他人来说估计也没有什么实质的参考价值, 但可以通过整一个流程, 大概感受一下作为业务运维, 在设计一个监控方案时候的一些思路.

参考文档

https://www.elastic.co/guide/en/elasticsearch/reference/7.10/indices-component-template.html

https://pymongo.readthedocs.io/en/stable/examples/datetimes.html

pymongo和ES的很多问题, 官方文档都有答案.