主页 > imtoken地址是什么 > PyFlink + 区块链?行业领导者 BTC.com 如何实现实时计算
PyFlink + 区块链?行业领导者 BTC.com 如何实现实时计算
总结:
大家好,我们是 BTC.com 团队。 2020 年,我们有幸进入了 Flink 和 PyFlink 生态系统。我们从团队自身需求出发,改进团队内部实时计算的任务和需求,构建了流批计算环境。
在实现实时计算的过程中,我们在实践中积累了一些经验,在此分享一下我们在这方面的一些心路历程。
0x01 目录
zeppelin、PyFlink on k8s 等实践
区块链领域实践前景•总结0x02困惑•描述•思考•行动
作为工程师,我们每天都在不断了解需求并发展业务。
有一天,我们参加了一个团队总结会议并收到了以下请求:
销售总监 A:
我们想知道销售历史和实时的转化率,销售,能不能统计实时的TOP5产品,有大促销的时候,用户的实时访问和实时查看TOP5产品可以根据他的历史访问记录。实时推荐相关产品。
营销总监 B:
我们想知道营销推广的效果,每个事件的实时数据,否则我们的市场发布无法准确评估效果,及时反馈。
研发总监 C:
部分用户的错误无法重现。日志可以更实时吗?传统的日志分析需要一定的梳理。可以直接清理/处理相关数据吗?
采购总监D:
近年来数字化流行吗?采购方希望预测采购需求,对支出进行实时分类管理,预测未来供应来源,提高成本。有没有办法做到这一点?还有一些供应商不是很稳定。你能监控他们的情况吗?
运维总监E:
有时候网站访问很慢,没有地方可以看到实时机器状态。有没有办法解决这个问题?
部门领导F:
以上人群的需求能否得到满足?
做了以上了解后发现,用户对数据需求的欲望不仅需要历史数据,还需要实时数据。
在电商、金融、制造等行业,数据快速增长,很多企业都面临着新的挑战。数据分析的实时处理框架,如制作一些实时数据分析报告、实时数据处理计算等。
与大多数公司类似,我们之前没有实时计算经验或积累。这时候我开始迷茫了,如何更好地满足上述要求,在成本和效果之间取得平衡,以及如何设计相关的架构。
当我们贫穷时,情况会发生变化。迷茫之后,我们开始梳理现有的条件和需要的东西。
首先,我们的业务范围主要是区块链浏览器和数据服务、区块链矿池、多币种钱包等。在区块链浏览器业务中,BTC.com是目前全球领先的区块链数据服务平台,矿池业务位居行业第一,区块链浏览器也是全球前三大浏览器之一。
首先,我们使用解析器解析区块链上的数据,获取各种数据信息,可以分析各个币种的地址活动、地址交易状态、交易流向、参与程度等。 目前BTC.com区块链浏览器与业内各大矿池、交易所都有相关合作,可以更好地实现部分数据的统计、整理、归纳、输出。
对于用户来说,不仅有专业的区块链开发者,还有各种b端和c端用户。 C端用户可以标记区块链地址、运行智能合约、查看智能合约相关内容等,以及对链上数据的检索和查看。 B端用户有更专业的支持和指导,提供API、区块链节点定制、交易加速、链上业务合作、数据定制等。
从数据量级来看,截至目前,比特币大约有5亿笔交易,超过3000万个地址,22亿输出(输出:每笔交易的输出),并且还在不断增长。在以太坊的情况下,更多。 BTC.com的矿池和区块链浏览器都支持多币种,每种币种的总数据量在几十TB左右。
矿池是矿工购买矿机后接入的服务平台。矿工通过接入矿池可以获得更稳定的收益。这是一项需要7*24小时稳定的服务。在其中,矿工不断提交自己计算出来的矿池下发任务的解,矿池会广播达到网络难度的解。这个过程也可以被认为是几乎实时的。矿工提交给服务端,服务端提交给kafka消息队列。同时,一些组件会监听这些消息以进行消费。这些提交的解决方案可以用来分析矿机的工作状态、算力、连接状态等。
在业务中,我们需要计算历史数据和实时数据。
历史数据需要关联一些币价和历史交易信息,而这些交易信息需要一直保存,这是典型的批处理任务。
每当确认一个新区块时,都会有一些数据可以被处理和分析。例如,如果某个地址在这个块中有一笔交易,那么可以从它的交易流中分析出来。此类交易,挖掘交易相关性。或者这个区块里面有一些特殊的交易,比如segwit交易,比如闪电网络交易,也就是这个币有一些独特的东西是可以分析统计的。并且新区块被确认时的难度预测也会发生变化。
还有大额交易的监控。通过新区块的确认和未确认交易,锁定一些大额交易,结合地址的一些注解,锁定交易流向,数据分析更好。
区块链也有一些 OLAP 要求。
在总结了数据统计的需求和问题后,我们开始思考:什么结构最合适,如何减少人员参与,成本低。
解决问题无非就是提出假设,通过测量,刷新认知。
浏览了一些资料,我们认为大部分的计算框架都是输入、处理、然后输出的。首先,我们需要获取数据,其中数据可以来自 MySQL 或 Kafka,然后进行计算。这里的计算可以是聚合或TOP 5类型。在实时中,可能有窗口类型。计算完成后,将结果分发,发送到消息通道和存储,发送到微信或钉钉,实现到MySQL。
团队一开始尝试spark,搭建yarn,使用airflow作为调度框架,通过集成导入mysql开发了一些批处理任务。具有离线任务的特点,数据固定,量大。 ,计算周期长,需要做一些复杂的运算。
在一些批处理任务中,这种架构是稳定的,但是随着业务的发展,对实时性的要求越来越高,实时数据并不能保证按顺序到达。按时间戳排序,消息的时间字段允许前后有间隔。数据模型方面,需求驱动的开发成本比较高,当时spark方式比较高,对state的处理不是很好,影响了部分效率。
其实2019年我也在调查一些实时计算的事情,关注Flink框架。那个时候还是java为主。整体框架概念与 Spark 不同。是一个特殊的流,但是因为团队在java方面没有基因和沉淀,使用Flink作为实时计算的架构在那个时候已经告一段落。 2020 年初,无论是阿里云、infoq,还是 B 站,PyFlink 都在推广,当时尤其是程和群和孙金成的视频,以及孙金成老师的博客,印象非常深刻。所以想尝试一下PyFlink,它的优势是流和批合一,未来也支持Python、pandas,甚至是tensorflow和keras的一些功能,对我们非常有吸引力。之后,我们正在考虑我们在 PyFlink 上的流-批集成架构。
0x03 流批集成架构
首先,我们需要对数据进行梳理,了解数据的来源。在 spark-based 期间,数据是从数据源定期加载(增量)数据,通过一定的转换逻辑,然后写入目的地。由于数据量和业务需要,延迟通常在小时级别,而实时,则要求尽可能短的延迟,因此对数据源进行了分类,分为几个部分。一部分是我们存储在 MySQL 中用于持久性的传统数据。这部分可以直接作为batch计算使用,也可以导入hive。 ,做进一步的计算。在实时部分,其实有很多想法。一种是通过MySQL的binlog来分析Btc区块链浏览器打不开,另一种是MySQL的cdc函数。经过多方考虑,我们最终选择了 Kafka,不仅是因为它是一个优秀的分布式 Streaming 平台,而且团队也有它的技术沉淀。
其实在本地开发的时候,安装Kafka也比较方便。只需要brew install kafka,通过conduktor客户端就可以很方便的看到各个topic的情况。因此,对现有的 Parser 进行了修改以支持 Kafka。当收到一个新块时,它会立即向 Kafka 发送消息,然后进行处理。
2018 年左右,团队将整体业务迁移到 Kubernetes。在业务发展的过程中,为开发和运维减轻了很多负担,所以建议有一定规模的业务,最好迁移到kubernetes。它的成本优化、DevOps 和高可用性支持是其他平台和传统方法无法比拟的。
在开发工作的过程中,我们尽量使用Flink SQL,同时结合一些Java和Python的UDF、UDAF、UDTF。每个作业通过初始化类似于以下的语句来形成特定的模式:
self.source_ddl = '''
CREATE TABLE SourceTable (xxx int) WITH
'''
self.sink_ddl = '''
CREATE TABLE SinkTable (xxx int) WITH
'''
self.transform_ddl = '''
INSERT INTO SinkTable
SELECT udf(xxx)
FROM SourceTable
GROUP BY FROM_UNIXTIME(`timestamp`, 'yyyyMMdd')
'''
未来数据将按照ODS、DWD、DWS进行分层,行业常用的ADS分为原始层、明细层和汇总层,进一步完善数据治理。
效果
最终,我们团队基于 PyFlink 开发快速完成了现有的任务,其中一些是批处理作业,处理这几天的数据,还有一些是实时作业。根据Kafka消息消费,目前比较稳定。
部署时选择了kubernetes,下面分享。在k8s中部署jobmanager和taskmanager,使用kubernetes的job函数作为批处理作业的部署,然后考虑连接一些监控平台,比如Prometheus。
在成本方面,由于使用了kubernetes集群,所以只有在机器上扩展主机的成本。这样成本比传统的yarn部署方式要低,kuberntes后期会支持Native部署,扩展jobmanager和taskmanager会更方便。
0x04 Zeppelin、PyFlink on k8s 等实践
Zeppelin被我们用于数据探索和逻辑验证,部分数据在本地不是真实数据,使用Zeppelin连接实际链上数据,进行计算的逻辑验证,验证完成后,可以转换成生产和部署所需的代码。
一、在 kubernetes 上构建 PyFlink 和 Zeppelin
完成后的部署Demo在github上,可以参考这里的配置文件修改
1).flink-conf.yaml
taskmanager.numberOfTaskSlots: 10
这里可以调整Taskmanager可以运行的作业数量
2).zeppelin-site.xml
cp conf/zeppelin-site.xml.template conf/zeppelin-site.xml; \
sed -i 's#127.0.0.1 #0.0.0.0 #g' conf/zeppelin-site.xml; \
sed -i 's#auto #local #g' conf/zeppelin-site.xml
Zeppelin 访问代理
nginx.ingress.kubernetes.io/configuration-snippet: |
proxy_set_header Upgrade "websocket";
proxy_set_header Connection "Upgrade";
Zeppelin需要在浏览器中与服务器建立socket连接,并且需要在ingress中添加websocket配置。
Flink 和 Zeppelin 数据持久化
- mountPath: /opt/flink/lib
- mountPath: /zeppelin/notebook/
PyFlink 在本地提交作业
1).local 安装 PyFlink
$ pip3 install apache-flink==1.11.1
2)。测试演示
def word_count():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
)
sink_ddl = """
create table Results (word VARCHAR, `count` BIGINT) with ( 'connector' = 'print')
"""
t_env.sql_update(sink_ddl)
elements = [(word, 1) for word in content.split(" ")]
# 这里也可以通过 Flink SQL
t_env.from_elements(elements, ["word", "count"]) \
.group_by("word") \
.select("word, count(1) as count") \
.insert_into("Results")
t_env.execute("word_count")
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
word_count()
或者实时处理的Demo:
def handle_kafka_message():
s_env = StreamExecutionEnvironment.get_execution_environment()
# s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)
st_env = StreamTableEnvironment \
.create(s_env, environment_settings=EnvironmentSettings
.new_instance()
.in_streaming_mode()
.use_blink_planner().build())
source_ddl = '''
CREATE TABLE SourceTable (
word string
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'Topic',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.zookeeper.connect' = 'localhost:2121',
'format.type' = 'json',
'format.derive-schema' = 'true'
)
'''
sink_ddl = """
create table Results (word VARCHAR) with ('connector' = 'print')
"""
st_env.sql_update(sink_ddl)
st_env.sql_update(source_ddl)
st_env.from_path("source").insert_into("sink")
st_env.execute("KafkaTest")
if __name__ == '__main__':
handle_kafka_message()
本地测试 Flink 提交作业
$ flink run -m localhost:8081 -py word_count.py
python/table/batch/word_count.py
Job has been submitted with JobID 0a31b61c2f974bcc3f344f57829fc5d5
Program execution finished
Job with JobID 0a31b61c2f974bcc3f344f57829fc5d5 has finished.
Job Runtime: 741 ms
PyFlink local 提交一个Python Job,相关代码打包的地方。
$ zip -r flinkdemo.zip ./*
$ flink run -m localhost:8081 -pyfs flinkdemo.zip -pym main
Kubernetes 通过集群本身的作业功能提交作业,然后开发一些 UI 后台接口用于作业管理和监控。 0x05 区块链领域实践
随着区块链技术越来越成熟,应用越来越多,行业规范化、标准化的趋势也开始显现,越来越依赖云计算,大数据毕竟是数字经济的产物。 BTC.com也植根于区块链技术基础设施,为各类公司、各类应用提供数据和业务支持。
近年来,一个词在IT行业流行起来。中台,无论是大公司还是初创公司,都喜欢讲这个概念,自称是业务中台、数据中台等。在我们的理解中,中台是一种能力从传统的单兵作战,到完善武器装备后勤保障,提高作战能力,全方位整合资源。打破数据中的数据孤岛,在快速变化的前端和日益稳定的后端之间取得平衡。在中国大陆和台湾更重要的是服务,最终还是要回馈客户和合作伙伴。
在区块链领域,BTC.com拥有深厚的行业技术积累,能够提供全方位的数据化能力。比如用机器学习估计链上的数据,估计eth的gas价格,最好的手续费等等,利用keras深度学习的能力进行一些回归计算,然后用Flink,机器学习结合区块链,为预测和标准化分类提供更多的数据样本。以前,该模型是通过定时任务不断训练的。与 Flink 结合后,实时性会更高。对此,未来会提供更多的话题,比如币价与Defi、舆论、市场等的关系,以及区块链地址和交易的标注和分类。即便是机器学习训练出来的模型,也放在 IPFS 网络中,通过去中心化代币进行训练,提供了轻松调用样本和模型的能力。
目前,BTC.com已经通过数据挖掘推出了一些能力Btc区块链浏览器打不开,包括交易推送、OLAP链上分析报告等,以改善和提升相关行业和开发者的实际体验。我们在各个链上都有监控节点来监控每个区块链网络的可用性和去中心化,并监控智能合约。在访问一些联盟链和隐私加密货币时,可以为联盟链和隐私加密货币提供这方面的数据能力。 BTC.com将以科技公司的本质,以技术发展为第一动力,以市场和客户为导向,开发创新和集成应用,良好的基础设施,为区块链产业生态发展做出更多努力。
0x06 展望/摘要
从实时计算趋势到流批一体化架构,通过对PyFlink和Flink的学习,各种作业任务已经稳定在线运行,对接实际业务需求。并搭建了Zeppelin平台,让业务发展更加便捷。计算上尽量依赖SQL,方便各方面的集成和调试。
在社区方面,PyFlink 并没有让我们失望,它的快速响应能力和不断完善的文档。还可以看到一些关于 Confluence 的 Flink Improvement Proposals,其中一些与 PyFlink 相关。近期还将支持Pandas UDAF、DataStream API、ML API,未来有望支持Joblistener。总之,在这里也感谢相关团队。
未来的展望,总结起来就是通过业务实现数据的价值。数据中心的最终目的是实现数据。