c# - Can an observable stream be partitioned by a timestamp, similar to Buffer or Window? -
i have stream of objects each contain timestamp. want partition stream non-overlapping windows, in similar fashion observable.buffer or observable.window. want window or buffer close when timestamp of object exceeds threshold, rather when real-time threshold exceeded.
for example, suppose want partition data 30 second windows/buffers , first object has timestamp of 00:00:00. when reach object timestamp exceeds 00:00:30 want window/buffer close , start cycle again. in way objects grouped appropriate buckets based on timestamp.
the current operators buffer , window close need not exactly. example, if did this:
mysource.window(timespan.fromseconds(30))
then of objects subscription encounters within 30 second window. problem data partitioned real-time 30 second window, rather window based on timestamp of object itself.
i guess requires me implement appropriate windowclosingselector func, i'm having difficulty getting work.
yes, can quite easily.
edit - i've revised solution use selector function obtain timestamp ticks - allows source element type supplied.
any solution needs know how @ timestamp work it. imagine have own element type in mind, let's assume argument elements typed system.reactive.timestamped<int>
. test example uses type, solution below work type can obtain ticks of timestamp.
the following extension method create windows according supplied windowduration
.
public static iobservable<igroupedobservable<long, tsource>> windowbytimestamp <tsource>( iobservable<tsource> source, func<tsource, long> timestampticksselector, timespan windowduration) { long durationticks = windowduration.ticks; return source.publish(ps => ps.groupbyuntil(x => timestampticksselector(x) / durationticks, g => ps.where( x => timestampticksselector(x) / durationticks != g.key))); }
how works
the trick see grouping operation. create group key based on integer division of our window duration element timestamp. use ticks of duration or , timestamp convenience.
the timestamp ticks obtained via supplied selector function.
now, follow window
behaviour, must expect timestamps form monotonically increasing sequence - is, each timestamp equal or later preceding one. should check constraint , make error* (see note on later, , additional code @ end).
so in order accomplish this, must close each group when new group starts. increasing behaviour assumed, need use groupbyuntil
's duration function check when element new key appears - close group. there therefore ever 1 active group, current window.
note
*if timestamps out of order, can use groupby
. won't need publish mechanism or duration function of groupbyuntil
- note groups complete when source stream completes. can use group key report window.
on related note, note return type of windowbytimestamp
iobservable<igroupedobservable<long,tsource>>
long
key type - gives access key
property in subsequent operations. in test below, used indexer of selectmany
create window number, using key
property gives more flexibility since key can long distinguishes windows. in case increasing sequence starting @ arbitrary looking number. number of times duration tick count divides timestamp tick count. note since windows can empty, step sizes vary too.
test
here's test see working - able use you'll need include nuget package rx-testing
:
public class tests : reactivetest { public void scenario1() { var scheduler = new testscheduler(); var live = scheduler.createhotobservable<timestamped<int>>( onnext(100, timestamped.create(1, new datetimeoffset(100, timespan.zero))), onnext(101, timestamped.create(1, new datetimeoffset(200, timespan.zero))), onnext(102, timestamped.create(2, new datetimeoffset(300, timespan.zero))), onnext(103, timestamped.create(2, new datetimeoffset(400, timespan.zero))), onnext(104, timestamped.create(3, new datetimeoffset(450, timespan.zero))), onnext(105, timestamped.create(3, new datetimeoffset(455, timespan.zero))), oncompleted<timestamped<int>>(105) ); var windows = live.windowbytimestamp( x => x.timestamp.ticks, timespan.fromticks(200)); var numberedwindows = windows.selectmany((x,i) => x.select(y => new { windownumber = i, timestamp = y.timestamp, value = y.value })); numberedwindows.subscribe(x => console.writeline( "window: {0}, time: {1} value: {2}", x.windownumber, x.timestamp.ticks, x.value)); scheduler.start(); } }
the output is:
window: 0, time: 100 value: 1 window: 1, time: 200 value: 1 window: 1, time: 300 value: 2 window: 2, time: 400 value: 2 window: 2, time: 450 value: 3 window: 2, time: 455 value: 3
checking out-of-order timestamps
finally, here example of 1 way might want check non-decreasing timestamp constraint:
public static iobservable<tsource> ensurenondecreasing <tsource, tcomparedproperty>( iobservable<tsource> source, func<tsource, tcomparedproperty> comparedpropertyselector) tcomparedproperty : icomparable<tcomparedproperty> { return observable.create((iobserver<tsource> o) => { bool started = false; var last = default(tcomparedproperty); return source.subscribe(x => { var current = comparedpropertyselector(x); if(started && current.compareto(last) < 0) { // might want provide more info here, // such offending element o.onerror(new invaliddataexception( "source contained decreasing element.")); return; } started = true; last = current; o.onnext(x); }, ex => o.onerror(ex), () => o.oncompleted()); }); }
to test this, alter test above include out-of-order datetimeoffset
, amend assignment of windows
variable include check , update subscribe
call print out error:
var windows = live.ensurenondecreasing(x => x.timestamp) // added operator .windowbytimestamp( x => x.timestamp.ticks, timespan.fromticks(200));
and:
numberedwindows.subscribe(x => console.writeline( "window: {0}, time: {1} value: {2}", x.windownumber, x.timestamp.ticks, x.value), ex => console.writeline(ex.message)); // added line
Comments
Post a Comment