Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pubspec.lock
modules/.settings
.vscode
modules/.vscode
modules/angular2/src/rx

# Don't check in secret files
*secret.js
Expand Down
6 changes: 3 additions & 3 deletions gulpfile.js
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ gulp.task('serve.e2e.dart', ['build.js.cjs'], function(neverDone) {
// d.ts generation
var Dgeni = require('dgeni');

gulp.task('docs/typings', [], function() {
gulp.task('docs/typings', function() {
try {
var dgeni = new Dgeni([require('./docs/typescript-definition-package')]);
return dgeni.generate();
Expand Down Expand Up @@ -448,7 +448,7 @@ function runKarma(configFile, done) {

gulp.task('test.js', function(done) {
runSequence('test.unit.tools/ci', 'test.transpiler.unittest', 'test.unit.js/ci',
'test.unit.cjs/ci', 'test.typings', 'test.typings.npm', sequenceComplete(done));
'test.unit.cjs/ci', 'test.typings.npm', sequenceComplete(done));
});

gulp.task('test.dart', function(done) {
Expand Down Expand Up @@ -1130,7 +1130,7 @@ var JS_DEV_DEPS = [

// Splice in RX license if rx is in the bundle.
function insertRXLicense(source) {
var n = source.indexOf('System.register("@reactivex/rxjs/dist/cjs/Subject"');
var n = source.indexOf('System.register("@reactivex/rxjs/dist/cjs/Rx"');
if (n >= 0) {
var rxLicense = licenseWrap('node_modules/@reactivex/rxjs/LICENSE.txt');
return source.slice(0, n) + rxLicense + source.slice(n);
Expand Down
60 changes: 27 additions & 33 deletions modules/angular2/src/core/facade/async.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import {global, isPresent} from 'angular2/src/core/facade/lang';
// without depending on rxjs.
import {PromiseWrapper, Promise, PromiseCompleter} from 'angular2/src/core/facade/promise';
export {PromiseWrapper, Promise, PromiseCompleter} from 'angular2/src/core/facade/promise';
// TODO(jeffbcross): use ES6 import once typings are available
var Subject = require('@reactivex/rxjs/dist/cjs/Subject');
import {Observable, Subject, Subscription} from '@reactivex/rxjs/dist/cjs/Rx';
export {Observable, Subject, Subscription} from '@reactivex/rxjs/dist/cjs/Rx';

export namespace NodeJS {
export interface Timer {}
Expand All @@ -24,31 +24,26 @@ export class TimerWrapper {

export class ObservableWrapper {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistant method parameter type:

subscribe(emitter: Observable<any>, ...)
callXxx(emitter: EventEmitter, ...)

// TODO(vsavkin): when we use rxnext, try inferring the generic type from the first arg
static subscribe<T>(emitter: Observable, onNext: (value: T) => void,
onThrow: (exception: any) => void = null,
onReturn: () => void = null): Object {
return emitter.observer({next: onNext, throw: onThrow, return: onReturn});
static subscribe<T>(emitter: Observable<any>, onNextOrObserver: (value: T) => void,
onError: (exception: any) => void = null,
onComplete: () => void = null): Object {
return emitter.subscribe(onNextOrObserver, onError, onComplete);
}

static isObservable(obs: any): boolean { return obs instanceof Observable; }

/**
* Returns whether `obs` has any subscribers listening to events.
*/
static hasSubscribers(obs: EventEmitter): boolean { return obs._subject.observers.length > 0; }
static hasSubscribers(obs: EventEmitter<any>): boolean { return obs.observers.length > 0; }

static dispose(subscription: any) { subscription.unsubscribe(); }

static callNext(emitter: EventEmitter, value: any) { emitter.next(value); }
static callNext(emitter: EventEmitter<any>, value: any) { emitter.next(value); }

static callThrow(emitter: EventEmitter, error: any) { emitter.throw(error); }
static callThrow(emitter: EventEmitter<any>, error: any) { emitter.error(error); }

static callReturn(emitter: EventEmitter) { emitter.return (null); }
}

// TODO: vsavkin change to interface
export class Observable {
observer(generator: any): Object { return null; }
static callReturn(emitter: EventEmitter<any>) { emitter.complete(); }
}

/**
Expand Down Expand Up @@ -77,9 +72,9 @@ export class Observable {
* toggle() {
* this.visible = !this.visible;
* if (this.visible) {
* this.open.next(null);
* this.open.emit(null);
* } else {
* this.close.next(null);
* this.close.emit(null);
* }
* }
* }
Expand All @@ -90,9 +85,7 @@ export class Observable {
*
* Once a reference implementation of the spec is available, switch to it.
*/
export class EventEmitter extends Observable {
/** @internal */
_subject = new Subject();
export class EventEmitter<T> extends Subject<T> {
/** @internal */
_isAsync: boolean;

Expand All @@ -105,19 +98,20 @@ export class EventEmitter extends Observable {
this._isAsync = isAsync;
}

observer(generator: any): any {
var schedulerFn = this._isAsync ? (value) => { setTimeout(() => generator.next(value)); } :
(value) => { generator.next(value); };
return this._subject.subscribe(schedulerFn,
(error) => generator.throw ? generator.throw(error) : null,
() => generator.return ? generator.return () : null);
}
emit(value: any) { this.next(value); }

toRx(): any { return this._subject; }
subscribe(observerOrNext: any, error?: any, complete?: any): Subscription<T> {
let generator;

next(value: any) { this._subject.next(value); }
if (observerOrNext && typeof observerOrNext === 'object') {
generator = observerOrNext;
} else if (observerOrNext) {
generator = {next: observerOrNext, error: error, complete: complete};
}

throw(error: any) { this._subject.error(error); }

return (value?: any) { this._subject.complete(); }
}
var schedulerFn = this._isAsync ? (value) => { setTimeout(() => generator.next(value)); } :
(value) => { generator.next(value); };
return super.subscribe(schedulerFn, (error) => generator.error ? generator.error(error) : null,
() => generator.complete ? generator.complete() : null);
}
}
4 changes: 2 additions & 2 deletions modules/angular2/src/core/forms/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ export class AbstractControl {
/** @internal */
_parent: ControlGroup | ControlArray;
/** @internal */
_valueChanges: EventEmitter;
_valueChanges: EventEmitter<any>;

constructor(public validator: Function) {}

Expand All @@ -74,7 +74,7 @@ export class AbstractControl {

get untouched(): boolean { return !this._touched; }

get valueChanges(): Observable { return this._valueChanges; }
get valueChanges(): Observable<any> { return this._valueChanges; }

markAsTouched(): void { this._touched = true; }

Expand Down
2 changes: 1 addition & 1 deletion modules/angular2/src/core/linker/query_list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export class QueryList<T> {
private _results: Array<T> = [];
private _emitter = new EventEmitter();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use Subject directly, so changes can return Observable<T>? Or use generic EventEmitter.


get changes(): Observable { return this._emitter; }
get changes(): Observable<any> { return this._emitter; }
get length(): number { return this._results.length; }
get first(): T { return ListWrapper.first(this._results); }
get last(): T { return ListWrapper.last(this._results); }
Expand Down
10 changes: 5 additions & 5 deletions modules/angular2/src/core/pipes/async_pipe.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {isBlank, isPresent, isPromise, CONST} from 'angular2/src/core/facade/lang';
import {Promise, ObservableWrapper, Observable} from 'angular2/src/core/facade/async';
import {Promise, ObservableWrapper, Observable, EventEmitter} from 'angular2/src/core/facade/async';
import {Pipe} from 'angular2/src/core/metadata';
import {Injectable} from 'angular2/src/core/di';
import {
Expand Down Expand Up @@ -69,7 +69,7 @@ export class AsyncPipe implements PipeTransform, PipeOnDestroy {
/** @internal */
_subscription: Object = null;
/** @internal */
_obj: Observable | Promise<any> = null;
_obj: Observable<any>| Promise<any>| EventEmitter<any> = null;
private _strategy: any = null;
/** @internal */
public _ref: ChangeDetectorRef;
Expand All @@ -81,7 +81,7 @@ export class AsyncPipe implements PipeTransform, PipeOnDestroy {
}
}

transform(obj: Observable | Promise<any>, args?: any[]): any {
transform(obj: Observable<any>| Promise<any>| EventEmitter<any>, args?: any[]): any {
if (isBlank(this._obj)) {
if (isPresent(obj)) {
this._subscribe(obj);
Expand All @@ -103,15 +103,15 @@ export class AsyncPipe implements PipeTransform, PipeOnDestroy {
}

/** @internal */
_subscribe(obj: Observable | Promise<any>): void {
_subscribe(obj: Observable<any>| Promise<any>| EventEmitter<any>): void {
this._obj = obj;
this._strategy = this._selectStrategy(obj);
this._subscription =
this._strategy.createSubscription(obj, value => this._updateLatestValue(obj, value));
}

/** @internal */
_selectStrategy(obj: Observable | Promise<any>): any {
_selectStrategy(obj: Observable<any>| Promise<any>| EventEmitter<any>): any {
if (isPromise(obj)) {
return _promiseStrategy;
} else if (ObservableWrapper.isObservable(obj)) {
Expand Down
8 changes: 4 additions & 4 deletions modules/angular2/src/core/zone/ng_zone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ export class NgZone {
_onErrorHandler: ErrorHandlingFn;

/** @internal */
_onTurnStartEvents: EventEmitter;
_onTurnStartEvents: EventEmitter<any>;
/** @internal */
_onTurnDoneEvents: EventEmitter;
_onTurnDoneEvents: EventEmitter<any>;
/** @internal */
_onEventDoneEvents: EventEmitter;
_onEventDoneEvents: EventEmitter<any>;
/** @internal */
_onErrorEvents: EventEmitter;
_onErrorEvents: EventEmitter<any>;

// Number of microtasks pending from _innerZone (& descendants)
/** @internal */
Expand Down
5 changes: 2 additions & 3 deletions modules/angular2/src/http/backends/jsonp_backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import {BrowserJsonp} from './browser_jsonp';
import {EventEmitter, ObservableWrapper} from 'angular2/src/core/facade/async';
import {makeTypeError} from 'angular2/src/core/facade/exceptions';
import {StringWrapper, isPresent} from 'angular2/src/core/facade/lang';
// todo(robwormald): temporary until https://github.com/angular/angular/issues/4390 decided
var Rx = require('@reactivex/rxjs/dist/cjs/Rx');
var {Observable} = Rx;
import {Observable} from 'angular2/core';

export abstract class JSONPConnection implements Connection {
readyState: ReadyStates;
request: Request;
Expand Down
53 changes: 44 additions & 9 deletions modules/angular2/src/http/backends/mock_backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import {ReadyStates} from '../enums';
import {Connection, ConnectionBackend} from '../interfaces';
import {isPresent} from 'angular2/src/core/facade/lang';
import {BaseException, WrappedException} from 'angular2/src/core/facade/exceptions';
var Rx = require('@reactivex/rxjs/dist/cjs/Rx');
let{Subject, ReplaySubject} = Rx;
import {Observable, EventEmitter} from 'angular2/core';

/**
*
Expand All @@ -27,14 +26,40 @@ export class MockConnection implements Connection {
*/
request: Request;

_mockResponse: any;
_mockErrorResponse: any;

/**
* {@link EventEmitter} of {@link Response}. Can be subscribed to in order to be notified when a
* response is available.
*/
response: any; // Subject<Response>

_next: (value: any) => void;
_error: (value: any) => void;
_complete: () => void;

constructor(req: Request) {
this.response = new ReplaySubject(1).take(1);
this.response = new Observable(mockResponseObserver => {

if (this._mockResponse) {
mockResponseObserver.next(this._mockResponse);
mockResponseObserver.complete();
return;
} else if (this._mockErrorResponse) {
mockResponseObserver.error(this._mockErrorResponse);
return;
}

else {
this._next = (res) => mockResponseObserver.next(res);
this._error = (err) => mockResponseObserver.error(err);
this._complete = () => mockResponseObserver.complete();
}
return () => {

}
});
this.readyState = ReadyStates.Open;
this.request = req;
}
Expand All @@ -58,8 +83,13 @@ export class MockConnection implements Connection {
throw new BaseException('Connection has already been resolved');
}
this.readyState = ReadyStates.Done;
this.response.next(res);
this.response.complete();

if (this._next) {
this._next(res);
this._complete();
} else {
this._mockResponse = res;
}
}

/**
Expand All @@ -84,7 +114,12 @@ export class MockConnection implements Connection {
mockError(err?: Error) {
// Matches XHR semantics
this.readyState = ReadyStates.Done;
this.response.error(err);

if (this._error) {
this._error(err);
} else {
this._mockErrorResponse = err;
}
}
}

Expand Down Expand Up @@ -153,7 +188,7 @@ export class MockBackend implements ConnectionBackend {
*
* This property only exists in the mock implementation, not in real Backends.
*/
connections: any; //<MockConnection>
connections: EventEmitter<any>; //<MockConnection>

/**
* An array representation of `connections`. This array will be updated with each connection that
Expand All @@ -173,9 +208,9 @@ export class MockBackend implements ConnectionBackend {
pendingConnections: any; // Subject<MockConnection>
constructor() {
this.connectionsArray = [];
this.connections = new Subject();
this.connections = new EventEmitter();
this.connections.subscribe(connection => this.connectionsArray.push(connection));
this.pendingConnections = new Subject();
this.pendingConnections = new EventEmitter();
}

/**
Expand Down
7 changes: 3 additions & 4 deletions modules/angular2/src/http/backends/xhr_backend.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ import {ResponseOptions, BaseResponseOptions} from '../base_response_options';
import {Injectable} from 'angular2/angular2';
import {BrowserXhr} from './browser_xhr';
import {isPresent} from 'angular2/src/core/facade/lang';
// todo(robwormald): temporary until https://github.com/angular/angular/issues/4390 decided
var Rx = require('@reactivex/rxjs/dist/cjs/Rx');
var {Observable} = Rx;
import {Observable} from 'angular2/core';

/**
* Creates connections using `XMLHttpRequest`. Given a fully-qualified
* request, an `XHRConnection` will immediately create an `XMLHttpRequest` object and send the
Expand All @@ -23,7 +22,7 @@ export class XHRConnection implements Connection {
* Response {@link EventEmitter} which emits a single {@link Response} value on load event of
* `XMLHttpRequest`.
*/
response: any; // TODO: Make generic of <Response>;
response: Observable<Response>; // TODO: Make generic of <Response>;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment can be removed now.

readyState: ReadyStates;
constructor(req: Request, browserXHR: BrowserXhr, baseResponseOptions?: ResponseOptions) {
this.request = req;
Expand Down
2 changes: 1 addition & 1 deletion modules/angular2/src/mock/location_mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export class SpyLocation implements Location {
/** @internal */
_query: string = '';
/** @internal */
_subject: EventEmitter = new EventEmitter();
_subject: EventEmitter<any> = new EventEmitter();
/** @internal */
_baseHref: string = '';

Expand Down
2 changes: 1 addition & 1 deletion modules/angular2/src/mock/mock_location_strategy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export class MockLocationStrategy extends LocationStrategy {
internalTitle: string = '';
urlChanges: string[] = [];
/** @internal */
_subject: EventEmitter = new EventEmitter();
_subject: EventEmitter<any> = new EventEmitter();
constructor() { super(); }

simulatePopState(url: string): void {
Expand Down
2 changes: 1 addition & 1 deletion modules/angular2/src/router/location.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export const APP_BASE_HREF: OpaqueToken = CONST_EXPR(new OpaqueToken('appBaseHre
@Injectable()
export class Location {
/** @internal */
_subject: EventEmitter = new EventEmitter();
_subject: EventEmitter<any> = new EventEmitter();
/** @internal */
_baseHref: string;

Expand Down
Loading