RikoPython 流处理引擎
Riko是一款Python 流处理引擎,类似Yahoo Pipes。采用纯python开发,用于分析处理结构化数据流。拥有同步和异步APIs,同时也支持并行RSS feeds。Riko也支持字符终端界面。
功能特性:
-
可读取csv/xml/json/html文件。
-
通过模块化的管道可创建文本流和数据流。
-
可解析、处理、提取RSS/Atom feeds。
-
可创建强大的混合型APIs和maps。
-
支持并行处理。
使用示例代码:
>>> ### Create a SyncPipe flow ### >>> # >>> # `SyncPipe` is a convenience class that creates chainable flows >>> # and allows for parallel processing. >>> from riko.collections.sync import SyncPipe >>> >>> ### Set the pipe configurations ### >>> # >>> # Notes: >>> # 1. the `detag` option will strip all html tags from the result >>> # 2. fetch the text contained inside the 'body' tag of the hackernews >>> # homepage >>> # 3. replace newlines with spaces and assign the result to 'content' >>> # 4. tokenize the resulting text using whitespace as the delimeter >>> # 5. count the number of times each token appears >>> # 6. obtain the raw stream >>> # 7. extract the first word and its count >>> # 8. extract the second word and its count >>> # 9. extract the third word and its count >>> url = 'https://news.ycombinator.com/' >>> fetch_conf = { ... 'url': url, 'start': '<body>', 'end': '</body>', 'detag': True} # 1 >>> >>> replace_conf = { ... 'rule': [ ... {'find': '\r\n', 'replace': ' '}, ... {'find': '\n', 'replace': ' '}]} >>> >>> flow = ( ... SyncPipe('fetchpage', conf=fetch_conf) # 2 ... .strreplace(conf=replace_conf, assign='content') # 3 ... .stringtokenizer(conf={'delimiter': ' '}, emit=True) # 4 ... .count(conf={'count_key': 'content'})) # 5 >>> >>> stream = flow.output # 6 >>> next(stream) # 7 {"'sad": 1} >>> next(stream) # 8 {'(': 28} >>> next(stream) # 9 {'(1999)': 1}
评论
Arroyo分布式流处理引擎
Arroyo 是一个用Rust编写的分布式流处理引擎,旨在有效地对数据流执行有状态计算。与传统的批处理不同,流引擎可以在有界和无界源上运行,并在结果可用时立即发出。使用SQL转换、过滤、聚合和连接Ka
Arroyo分布式流处理引擎
0
FaustPython 流处理
Faust是一个流处理库,将想法从KafkaStreams移植到Python。它在Robinhood用于构建高性能的分布式系统和实时数据管道,每天处理数十亿个事件。Faust提供流处理和事件处理,与K
FaustPython 流处理
0
SkiaGoogle 图形处理引擎
skia是个2D向量图形处理函数库,包含字型、座标转换,以及点阵图都有高效能且简洁的表现。不仅用于GoogleChrome浏览器,新兴的Android开放手机平台也采用skia作为绘图处理,搭配Ope
SkiaGoogle 图形处理引擎
0