datacube数据立方体
数据立方体是复杂计算的抽象。Datacube 是用 Java 实现的,可插入数据库后端支持的数据立方体。
datacube 是用来存储大数据点的聚合信息。数据立方体存储的是有趣输入数据点的子集。比如,你正在编写一个 web 服务器日志分析工具,你的输入点可能是日志行,你可能会计算每个浏览器的类型,每个浏览器的版本,操作系统类型,操作系统版本和其他属性。同时你可能会需要计算一个特定的组合计数(浏览器类型,浏览器版本,操作系统类型), (浏览器类型,浏览器版本,操作系统类型,操作系统版本),等等。
这对快速添加和修改计数是个很大的挑战,会浪费很多时间在数据库代码和重新用新计数器处理旧数据。而数据立方体就可以帮忙解决这些问题。
Urban Airship 使用 datacube 项目来支持他们的移动端应用的分析栈,每个节点每秒处理大约 10 K 的事件。
datacube 要求 JDK 1.6。
特性
性能: 高速异步 IO 后端处理
使用 Hadoop MapReduce 进行批量加载
可插入数据库接口
datacube 暂时只支持 HBase 数据库后端。
示例:
IdService idService = new CachingIdService(5, new MapIdService()); ConcurrentMap<BoxedByteArray,byte[]> backingMap = new ConcurrentHashMap<BoxedByteArray, byte[]>(); DbHarness<LongOp> dbHarness = new MapDbHarness<LongOp>(backingMap, LongOp.DESERIALIZER, CommitType.READ_COMBINE_CAS, idService); HourDayMonthBucketer hourDayMonthBucketer = new HourDayMonthBucketer(); Dimension<DateTime> time = new Dimension<DateTime>("time", hourDayMonthBucketer, false, 8); Dimension<String> zipcode = new Dimension<String>("zipcode", new StringToBytesBucketer(), true, 5); DataCubeIo<LongOp> cubeIo = null; DataCube<LongOp> cube; Rollup hourAndZipRollup = new Rollup(zipcode, time, HourDayMonthBucketer.hours); Rollup dayAndZipRollup = new Rollup(zipcode, time, HourDayMonthBucketer.days); Rollup hourRollup = new Rollup(time, HourDayMonthBucketer.hours); Rollup dayRollup = new Rollup(time, HourDayMonthBucketer.days); List<Dimension<?>> dimensions = ImmutableList.<Dimension<?>>of(time, zipcode); List<Rollup> rollups = ImmutableList.of(hourAndZipRollup, dayAndZipRollup, hourRollup, dayRollup); cube = new DataCube<LongOp>(dimensions, rollups); cubeIo = new DataCubeIo<LongOp>(cube, dbHarness, 1, Long.MAX_VALUE, SyncLevel.FULL_SYNC); DateTime now = new DateTime(DateTimeZone.UTC); // Do an increment of 5 for a certain time and zipcode cubeIo.writeSync(new LongOp(5), new WriteBuilder(cube) .at(time, now) .at(zipcode, "97201")); // Do an increment of 10 for the same zipcode in a different hour of the same day DateTime differentHour = now.withHourOfDay((now.getHourOfDay()+1)%24); cubeIo.writeSync(new LongOp(10), new WriteBuilder(cube) .at(time, differentHour) .at(zipcode, "97201")); // Read back the value that we wrote for the current hour, should be 5 Optional<LongOp> thisHourCount = cubeIo.get(new ReadBuilder(cube) .at(time, HourDayMonthBucketer.hours, now) .at(zipcode, "97201")); Assert.assertTrue(thisHourCount.isPresent()); Assert.assertEquals(5L, thisHourCount.get().getLong()); // Read back the value we wrote for the other hour, should be 10 Optional<LongOp> differentHourCount = cubeIo.get(new ReadBuilder(cube) .at(time, HourDayMonthBucketer.hours, differentHour) .at(zipcode, "97201")); Assert.assertTrue(differentHourCount.isPresent()); Assert.assertEquals(10L, differentHourCount.get().getLong()); // The total for today should be the sum of the two increments Optional<LongOp> todayCount = cubeIo.get(new ReadBuilder(cube) .at(time, HourDayMonthBucketer.days, now) .at(zipcode, "97201")); Assert.assertTrue(todayCount.isPresent()); Assert.assertEquals(15L, todayCount.get().getLong());
评论