Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ gulp.task('static-checks', ['!build.tools'], function(done) {
var tmpdir = path.join(os.tmpdir(), 'test.typings', new Date().getTime().toString());
gulp.task('!pre.test.typings.layoutNodeModule', ['build.js.cjs'], function() {
return gulp
.src(['dist/js/cjs/angular2/**/*'], {base: 'dist/js/cjs'})
.src(['dist/js/cjs/angular2/**/*', 'node_modules/@reactivex/rxjs/dist/cjs/**'], {base: 'dist/js/cjs'})
.pipe(gulp.dest(path.join(tmpdir, 'node_modules')));
});
gulp.task('!pre.test.typings.copyTypingsSpec', function() {
Expand All @@ -803,6 +803,7 @@ gulp.task('test.typings', [
.pipe(tsc({target: 'ES5', module: 'commonjs',
experimentalDecorators: true,
noImplicitAny: true,
moduleResolution: 'node',
typescript: require('typescript')}));
});

Expand Down
2 changes: 1 addition & 1 deletion modules/angular2/src/core/facade.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Public API for Facade
export {ConcreteType, Type} from './facade/lang';
export {Observable, EventEmitter} from './facade/async';
export {Observable, EventEmitter, Subject} from './facade/async';
export {Predicate} from './facade/collection';
export {WrappedException} from './facade/exceptions';
33 changes: 30 additions & 3 deletions modules/angular2/src/core/facade/async.dart
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ class ObservableWrapper {
emitter.add(value);
}

static void callThrow(EventEmitter emitter, error) {
static void callError(EventEmitter emitter, error) {
emitter.addError(error);
}

static void callReturn(EventEmitter emitter) {
static void callComplete(EventEmitter emitter) {
emitter.close();
}
}

class EventEmitter extends Stream {
class EventEmitter<T> extends Stream<T> {
StreamController<dynamic> _controller;

/// Creates an instance of [EventEmitter], which depending on [isAsync],
Expand All @@ -86,3 +86,30 @@ class EventEmitter extends Stream {
_controller.close();
}
}

//todo(robwormald): maybe fix in ts2dart?
class Subject<T> extends Stream<T> {
StreamController<dynamic> _controller;

Subject([bool isAsync = true]) {
_controller = new StreamController.broadcast(sync: !isAsync);
}

StreamSubscription listen(void onData(dynamic line),
{void onError(Error error), void onDone(), bool cancelOnError}) {
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}

void add(value) {
_controller.add(value);
}

void addError(error) {
_controller.addError(error);
}

void close() {
_controller.close();
}
}
68 changes: 36 additions & 32 deletions modules/angular2/src/core/facade/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import {global, isPresent} from 'angular2/src/core/facade/lang';
// without depending on rxjs.
import {PromiseWrapper, Promise, PromiseCompleter} from 'angular2/src/core/facade/promise';
export {PromiseWrapper, Promise, PromiseCompleter} from 'angular2/src/core/facade/promise';
// TODO(jeffbcross): use ES6 import once typings are available
var Subject = require('@reactivex/rxjs/dist/cjs/Subject');
import {Subject, Subscription, Observable as RxObservable} from '@reactivex/rxjs/dist/cjs/Rx';
export {Subject} from '@reactivex/rxjs/dist/cjs/Rx';
import Operator from '@reactivex/rxjs/dist/cjs/Operator';

export namespace NodeJS {
export interface Timer {}
Expand All @@ -24,31 +25,25 @@ export class TimerWrapper {

export class ObservableWrapper {
// TODO(vsavkin): when we use rxnext, try inferring the generic type from the first arg
static subscribe<T>(emitter: Observable, onNext: (value: T) => void,
onThrow: (exception: any) => void = null,
onReturn: () => void = null): Object {
return emitter.observer({next: onNext, throw: onThrow, return: onReturn});
static subscribe<T>(emitter: any, onNext: (value: T) => void, onError?: (exception: any) => void,
onComplete?: () => void): Object {
return emitter.subscribe({next: onNext, error: onError, complete: onComplete});
}

static isObservable(obs: any): boolean { return obs instanceof Observable; }
static isObservable(obs: any): boolean { return obs instanceof RxObservable; }

/**
* Returns whether `obs` has any subscribers listening to events.
*/
static hasSubscribers(obs: EventEmitter): boolean { return obs._subject.observers.length > 0; }
static hasSubscribers(obs: EventEmitter<any>): boolean { return obs.observers.length > 0; }

static dispose(subscription: any) { subscription.unsubscribe(); }

static callNext(emitter: EventEmitter, value: any) { emitter.next(value); }
static callNext(emitter: EventEmitter<any>, value: any) { emitter.next(value); }

static callThrow(emitter: EventEmitter, error: any) { emitter.throw(error); }
static callError(emitter: EventEmitter<any>, error: any) { emitter.error(error); }

static callReturn(emitter: EventEmitter) { emitter.return (null); }
}

// TODO: vsavkin change to interface
export class Observable {
observer(generator: any): Object { return null; }
static callComplete(emitter: EventEmitter<any>) { emitter.complete(); }
}

/**
Expand Down Expand Up @@ -90,9 +85,7 @@ export class Observable {
*
* Once a reference implementation of the spec is available, switch to it.
*/
export class EventEmitter extends Observable {
/** @internal */
_subject = new Subject();
export class EventEmitter<T> extends Subject<T> {
/** @internal */
_isAsync: boolean;

Expand All @@ -105,19 +98,30 @@ export class EventEmitter extends Observable {
this._isAsync = isAsync;
}

observer(generator: any): any {
var schedulerFn = this._isAsync ? (value) => { setTimeout(() => generator.next(value)); } :
(value) => { generator.next(value); };
return this._subject.subscribe(schedulerFn,
(error) => generator.throw ? generator.throw(error) : null,
() => generator.return ? generator.return () : null);
subscribe(generatorOrNext?: any, error?: any, complete?: any): any {
if (generatorOrNext && typeof generatorOrNext === 'object') {
let schedulerFn = this._isAsync ?
(value) => { setTimeout(() => generatorOrNext.next(value)); } :
(value) => { generatorOrNext.next(value); };
return super.subscribe(schedulerFn,
(err) => generatorOrNext.error ? generatorOrNext.error(err) : null,
() => generatorOrNext.complete ? generatorOrNext.complete() : null);
} else {
let schedulerFn = this._isAsync ? (value) => { setTimeout(() => generatorOrNext(value)); } :
(value) => { generatorOrNext(value); };

return super.subscribe(schedulerFn, (err) => error ? error(err) : null,
() => complete ? complete() : null);
}
}
}

toRx(): any { return this._subject; }

next(value: any) { this._subject.next(value); }

throw(error: any) { this._subject.error(error); }

return (value?: any) { this._subject.complete(); }
// todo(robwormald): ts2dart should handle this properly
export class Observable<T> extends RxObservable<T> {
lift<T, R>(operator: Operator<T, R>): Observable<T> {
const observable = new Observable();
observable.source = this;
observable.operator = operator;
return observable;
}
}
5 changes: 3 additions & 2 deletions modules/angular2/src/core/forms/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export abstract class AbstractControl {
_value: any;

/** @internal */
_valueChanges: EventEmitter;
_valueChanges: EventEmitter<any>;

private _status: string;
private _errors: {[key: string]: any};
Expand Down Expand Up @@ -86,7 +86,8 @@ export abstract class AbstractControl {

get untouched(): boolean { return !this._touched; }

get valueChanges(): Observable { return this._valueChanges; }
get valueChanges(): Observable<any> { return this._valueChanges; }

get pending(): boolean { return this._status == PENDING; }

markAsTouched(): void { this._touched = true; }
Expand Down
2 changes: 1 addition & 1 deletion modules/angular2/src/core/linker/query_list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class QueryList<T> {
private _results: Array<T> = [];
private _emitter = new EventEmitter();

get changes(): Observable { return this._emitter; }
get changes(): Observable<any> { return this._emitter; }
get length(): number { return this._results.length; }
get first(): T { return ListWrapper.first(this._results); }
get last(): T { return ListWrapper.last(this._results); }
Expand Down
10 changes: 5 additions & 5 deletions modules/angular2/src/core/pipes/async_pipe.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {isBlank, isPresent, isPromise, CONST} from 'angular2/src/core/facade/lang';
import {Promise, ObservableWrapper, Observable} from 'angular2/src/core/facade/async';
import {Promise, ObservableWrapper, Observable, EventEmitter} from 'angular2/src/core/facade/async';
import {Pipe} from 'angular2/src/core/metadata';
import {Injectable} from 'angular2/src/core/di';
import {
Expand Down Expand Up @@ -69,7 +69,7 @@ export class AsyncPipe implements PipeTransform, PipeOnDestroy {
/** @internal */
_subscription: Object = null;
/** @internal */
_obj: Observable | Promise<any> = null;
_obj: Observable<any>| Promise<any>| EventEmitter<any> = null;
private _strategy: any = null;
/** @internal */
public _ref: ChangeDetectorRef;
Expand All @@ -81,7 +81,7 @@ export class AsyncPipe implements PipeTransform, PipeOnDestroy {
}
}

transform(obj: Observable | Promise<any>, args?: any[]): any {
transform(obj: Observable<any>| Promise<any>| EventEmitter<any>, args?: any[]): any {
if (isBlank(this._obj)) {
if (isPresent(obj)) {
this._subscribe(obj);
Expand All @@ -103,15 +103,15 @@ export class AsyncPipe implements PipeTransform, PipeOnDestroy {
}

/** @internal */
_subscribe(obj: Observable | Promise<any>): void {
_subscribe(obj: Observable<any>| Promise<any>| EventEmitter<any>): void {
this._obj = obj;
this._strategy = this._selectStrategy(obj);
this._subscription =
this._strategy.createSubscription(obj, value => this._updateLatestValue(obj, value));
}

/** @internal */
_selectStrategy(obj: Observable | Promise<any>): any {
_selectStrategy(obj: Observable<any>| Promise<any>| EventEmitter<any>): any {
if (isPromise(obj)) {
return _promiseStrategy;
} else if (ObservableWrapper.isObservable(obj)) {
Expand Down
8 changes: 4 additions & 4 deletions modules/angular2/src/core/zone/ng_zone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ export class NgZone {
_onErrorHandler: ErrorHandlingFn;

/** @internal */
_onTurnStartEvents: EventEmitter;
_onTurnStartEvents: EventEmitter<any>;
/** @internal */
_onTurnDoneEvents: EventEmitter;
_onTurnDoneEvents: EventEmitter<any>;
/** @internal */
_onEventDoneEvents: EventEmitter;
_onEventDoneEvents: EventEmitter<any>;
/** @internal */
_onErrorEvents: EventEmitter;
_onErrorEvents: EventEmitter<any>;

// Number of microtasks pending from _innerZone (& descendants)
/** @internal */
Expand Down
2 changes: 1 addition & 1 deletion modules/angular2/src/mock/location_mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export class SpyLocation implements Location {
/** @internal */
_query: string = '';
/** @internal */
_subject: EventEmitter = new EventEmitter();
_subject: EventEmitter<any> = new EventEmitter();
/** @internal */
_baseHref: string = '';

Expand Down
2 changes: 1 addition & 1 deletion modules/angular2/src/mock/mock_location_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export class MockLocationStrategy extends LocationStrategy {
internalTitle: string = '';
urlChanges: string[] = [];
/** @internal */
_subject: EventEmitter = new EventEmitter();
_subject: EventEmitter<any> = new EventEmitter();
constructor() { super(); }

simulatePopState(url: string): void {
Expand Down
2 changes: 1 addition & 1 deletion modules/angular2/src/router/location.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export const APP_BASE_HREF: OpaqueToken = CONST_EXPR(new OpaqueToken('appBaseHre
@Injectable()
export class Location {
/** @internal */
_subject: EventEmitter = new EventEmitter();
_subject: EventEmitter<any> = new EventEmitter();
/** @internal */
_baseHref: string;

Expand Down
2 changes: 1 addition & 1 deletion modules/angular2/src/router/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export class Router {
private _auxRouters = new Map<string, Router>();
private _childRouter: Router;

private _subject: EventEmitter = new EventEmitter();
private _subject: EventEmitter<any> = new EventEmitter();


constructor(public registry: RouteRegistry, public parent: Router, public hostComponent: any) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export abstract class ClientMessageBroker {

export class ClientMessageBroker_ extends ClientMessageBroker {
private _pending: Map<string, PromiseCompleter<any>> = new Map<string, PromiseCompleter<any>>();
private _sink: EventEmitter;
private _sink: EventEmitter<any>;
/** @internal */
public _serializer: Serializer;

Expand Down
8 changes: 4 additions & 4 deletions modules/angular2/src/web_workers/shared/message_bus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ export abstract class MessageBus implements MessageBusSource, MessageBusSink {
* Returns an {@link EventEmitter} that emits every time a message
* is received on the given channel.
*/
abstract from(channel: string): EventEmitter;
abstract from(channel: string): EventEmitter<any>;


/**
* Returns an {@link EventEmitter} for the given channel
* To publish methods to that channel just call next (or add in dart) on the returned emitter
*/
abstract to(channel: string): EventEmitter;
abstract to(channel: string): EventEmitter<any>;
}

export interface MessageBusSource {
Expand All @@ -60,7 +60,7 @@ export interface MessageBusSource {
* Returns an {@link EventEmitter} that emits every time a message
* is received on the given channel.
*/
from(channel: string): EventEmitter;
from(channel: string): EventEmitter<any>;
}

export interface MessageBusSink {
Expand All @@ -83,5 +83,5 @@ export interface MessageBusSink {
* Returns an {@link EventEmitter} for the given channel
* To publish methods to that channel just call next (or add in dart) on the returned emitter
*/
to(channel: string): EventEmitter;
to(channel: string): EventEmitter<any>;
}
Loading