diff --git a/.github/workflows/compute-entropy.py b/.github/workflows/compute-entropy.py
new file mode 100644
index 0000000000..42655913bb
--- /dev/null
+++ b/.github/workflows/compute-entropy.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python3
+import sys, math, json, subprocess
+from collections import Counter
+from pathlib import Path
+
+def shannon_entropy(text: str) -> float:
+ if not text or len(text) < 10:
+ return 0.0
+ freq = Counter(text)
+ probs = [count / len(text) for count in freq.values()]
+ return -sum(p * math.log2(p) for p in probs if p > 0)
+
+# Get changed files safely for pull_request events
+changed_files = []
+try:
+ # GitHub provides github.event.pull_request.base.sha and head.sha in the context
+ base_sha = subprocess.check_output(['git', 'rev-parse', 'origin/${{ github.base_ref }}'], text=True).strip()
+ changed_files = subprocess.check_output(
+ ['git', 'diff', '--name-only', base_sha, 'HEAD'], text=True
+ ).splitlines()
+except subprocess.CalledProcessError:
+ # Fallback for first-time PRs or edge cases: use the merge-base or just files in HEAD
+ try:
+ changed_files = subprocess.check_output(
+ ['git', 'diff', '--name-only', 'HEAD~1', 'HEAD'], text=True
+ ).splitlines()
+ except subprocess.CalledProcessError:
+ # Last resort: all files in the repo
+ changed_files = subprocess.check_output(['git', 'ls-files'], text=True).splitlines()
+
+results = []
+total_ent = 0.0
+count = 0
+
+for f in changed_files:
+ path = Path(f.strip())
+ if not path.exists() or path.suffix in {'.png', '.jpg', '.gif', '.bin', '.lock', '.exe', '.dll', '.so'}:
+ continue
+ try:
+ content = path.read_text(encoding='utf-8', errors='ignore')
+ ent = shannon_entropy(content)
+ results.append(f"{f}: {ent:.3f}")
+ total_ent += ent
+ count += 1
+ except Exception:
+ pass
+
+avg = round(total_ent / count, 3) if count > 0 else 0.0
+
+verdict = (
+ "✅ Mid-4 beauty detected (thoughtful human code!)" if 4.3 <= avg <= 4.7 else
+ "⚠️ Consider review — entropy outside sweet spot" if avg > 0 else
+ "No source files changed"
+)
+
+with open('/tmp/beauty.json', 'w') as f:
+ json.dump({
+ "average_entropy": avg,
+ "verdict": verdict,
+ "files": results[:20]
+ }, f, indent=2)
+
+print(f"Average entropy: {avg}")
+print(verdict)
diff --git a/.github/workflows/entropy-beauty-scan.yml b/.github/workflows/entropy-beauty-scan.yml
new file mode 100644
index 0000000000..0923ba510d
--- /dev/null
+++ b/.github/workflows/entropy-beauty-scan.yml
@@ -0,0 +1,123 @@
+name: Entropy Beauty + TruffleHog Scan
+
+on: [push, release, pull_request, pull_request_target]
+
+permissions:
+ contents: read
+ pull-requests: write
+ issues: write # must be at workflow level for push/merge events
+
+jobs:
+ scan:
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout code (full history)
+ uses: actions/checkout@v6
+ with:
+ fetch-depth: 0
+
+ - name: Run TruffleHog
+ uses: trufflesecurity/trufflehog@main
+ with:
+ path: .
+ extra_args: --results=verified,unknown --filter-entropy=3.5 --json
+
+ - name: Compute mid-4 beauty entropy
+ run: python .github/workflows/compute-entropy.py
+
+ - name: Post summary comment (PR only)
+ if: github.event_name == 'pull_request' || github.event_name == 'pull_request_target'
+ uses: actions/github-script@v8
+ with:
+ github-token: ${{ secrets.GITHUB_TOKEN }}
+ script: |
+ const fs = require('fs');
+
+ // Read TruffleHog output — it prints one JSON object per line (NDJSON)
+ let findings = [];
+ if (fs.existsSync('trufflehog.json')) {
+ try {
+ const lines = fs.readFileSync('trufflehog.json', 'utf8').trim().split('\n');
+ findings = lines.map(line => {
+ try { return JSON.parse(line); } catch(e) { return null; }
+ }).filter(Boolean);
+ } catch(e) {}
+ } else {
+ // Fallback: the action also logs to GITHUB_STEP_SUMMARY, but we use the file from the Python step
+ console.log("No trufflehog.json found, using empty findings");
+ }
+
+ const beauty = JSON.parse(fs.readFileSync('/tmp/beauty.json', 'utf8'));
+
+ let body = `## 🐷 TruffleHog + Entropy Beauty Scan\n\n`;
+ body += `**Average entropy of changed code:** ${beauty.average_entropy} bits/char\n`;
+ body += `**Verdict:** ${beauty.verdict}\n\n`;
+
+ if (beauty.files && beauty.files.length) {
+ body += `**Changed files entropy:**\n\`\`\`\n${beauty.files.join('\n')}\n\`\`\`\n\n`;
+ }
+
+ if (findings.length > 0) {
+ body += `⚠️ **TruffleHog found ${findings.length} potential issue(s)**\n`;
+ } else {
+ body += `✅ No secrets or suspicious high-entropy strings found.\n`;
+ }
+
+ body += `\n*Mid-4 beauty heuristic in action — powered by our entropy chats! 😊*`;
+
+ await github.rest.issues.createComment({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ issue_number: context.issue.number,
+ body: body
+ });
+ # ── Create issue on push ONLY if suspicious (entropy outside 4.3–4.7) ──
+ - name: Create issue on suspicious push
+ if: github.event_name == 'push' || github.event_name == 'release'
+ uses: actions/github-script@v8
+ with:
+ github-token: ${{ secrets.GITHUB_TOKEN }}
+ script: |
+ const fs = require('fs');
+ const beauty = JSON.parse(fs.readFileSync('/tmp/beauty.json', 'utf8'));
+
+ // Only create issue if it's NOT beautiful mid-4
+ if (beauty.average_entropy >= 4.3 && beauty.average_entropy <= 4.7) {
+ console.log("✅ Mid-4 beauty — no issue created");
+ return;
+ }
+
+ let findings = [];
+ if (fs.existsSync('trufflehog.json')) {
+ try {
+ const lines = fs.readFileSync('trufflehog.json', 'utf8').trim().split('\n');
+ findings = lines.map(line => {
+ try { return JSON.parse(line); } catch(e) { return null; }
+ }).filter(Boolean);
+ } catch(e) {}
+ }
+
+ let body = `**Average entropy:** ${beauty.average_entropy} bits/char\n\n`;
+ body += `**Verdict:** ${beauty.verdict}\n\n`;
+
+ if (beauty.files && beauty.files.length) {
+ body += `**Changed files:**\n\`\`\`\n${beauty.files.join('\n')}\n\`\`\`\n\n`;
+ }
+
+ if (findings.length > 0) {
+ body += `**TruffleHog found ${findings.length} potential issue(s)**\n`;
+ } else {
+ body += `✅ No secrets or suspicious high-entropy strings found.\n`;
+ }
+
+ body += `\n*Triggered by push to \`${context.sha}\` — mid-4 beauty heuristic*`;
+
+ await github.rest.issues.create({
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ title: `🚨 Suspicious entropy detected in recent push (${beauty.average_entropy})`,
+ body: body,
+ labels: ['entropy', 'security', 'review-needed']
+ });
+
+ console.log("⚠️ Created issue because entropy was outside mid-4 range");
diff --git a/.github/workflows/gradle-wrapper-validation.yml b/.github/workflows/gradle-wrapper-validation.yml
index 44e5c1a692..4174da2072 100644
--- a/.github/workflows/gradle-wrapper-validation.yml
+++ b/.github/workflows/gradle-wrapper-validation.yml
@@ -9,5 +9,5 @@ jobs:
name: "Validation"
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- uses: gradle/wrapper-validation-action@f9c9c575b8b21b6485636a91ffecd10e558c62f6 # v3.5.0
diff --git a/.github/workflows/gradle_branch.yml b/.github/workflows/gradle_branch.yml
index 650cbe7c05..ef7ad91cc2 100644
--- a/.github/workflows/gradle_branch.yml
+++ b/.github/workflows/gradle_branch.yml
@@ -15,14 +15,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- - name: Set up JDK 8
- uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
+ - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+ - name: Set up JDK 11
+ uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5.2.0
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+ uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v5.0.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }}
@@ -32,6 +32,6 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace
- name: Upload to Codecov
- uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
- name: Generate Javadoc
run: ./gradlew javadoc --stacktrace
diff --git a/.github/workflows/gradle_jdk11.yml b/.github/workflows/gradle_jdk11.yml
index 33c781fca9..8eab1cf14a 100644
--- a/.github/workflows/gradle_jdk11.yml
+++ b/.github/workflows/gradle_jdk11.yml
@@ -12,19 +12,22 @@ on:
permissions:
contents: read
+env:
+ BUILD_WITH_11: true
+
jobs:
build:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Set up JDK 11
- uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
+ uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5.2.0
with:
distribution: 'zulu'
java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+ uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v5.0.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }}
@@ -35,5 +38,5 @@ jobs:
run: ./gradlew -PjavaCompatibility=9 jar
- name: Build RxJava
run: ./gradlew build --stacktrace
- - name: Generate Javadoc
- run: ./gradlew javadoc --stacktrace
+# - name: Generate Javadoc
+# run: ./gradlew javadoc --stacktrace
diff --git a/.github/workflows/gradle_pr.yml b/.github/workflows/gradle_pr.yml
index 712139379f..815b14d963 100644
--- a/.github/workflows/gradle_pr.yml
+++ b/.github/workflows/gradle_pr.yml
@@ -15,14 +15,14 @@ jobs:
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- - name: Set up JDK 8
- uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
+ - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+ - name: Set up JDK 11
+ uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5.2.0
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+ uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v5.0.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-1-${{ hashFiles('**/*.gradle') }}
@@ -32,6 +32,6 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace
- name: Upload to Codecov
- uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
- name: Generate Javadoc
run: ./gradlew javadoc --stacktrace
diff --git a/.github/workflows/gradle_release.yml b/.github/workflows/gradle_release.yml
index a341c27ffc..3d3ddc27f2 100644
--- a/.github/workflows/gradle_release.yml
+++ b/.github/workflows/gradle_release.yml
@@ -22,14 +22,14 @@ jobs:
env:
CI_BUILD_NUMBER: ${{ github.run_number }}
steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- - name: Set up JDK 8
- uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
+ - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+ - name: Set up JDK 11
+ uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5.2.0
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+ uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v5.0.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }}
@@ -43,9 +43,18 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace --no-daemon
- name: Upload to Codecov
- uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
- - name: Upload release
- run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
+# - name: Upload release
+# run: ./gradlew -PreleaseMode=full publish --no-daemon --no-parallel --stacktrace
+# env:
+# # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
+# # ------------------------------------------------------------------------------
+# ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
+# ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
+# ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }}
+# ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }}
+ - name: Publish release
+ run: ./gradlew -PreleaseMode=full publishAndReleaseToMavenCentral --no-configuration-cache --no-daemon --no-parallel --stacktrace
env:
# Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
# ------------------------------------------------------------------------------
@@ -53,13 +62,6 @@ jobs:
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
ORG_GRADLE_PROJECT_SIGNING_PRIVATE_KEY: ${{ secrets.SIGNING_PRIVATE_KEY }}
ORG_GRADLE_PROJECT_SIGNING_PASSWORD: ${{ secrets.SIGNING_PASSWORD }}
- - name: Publish release
- run: ./gradlew -PreleaseMode=full closeAndReleaseRepository --no-daemon --no-parallel --stacktrace
- env:
- # Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
- # ------------------------------------------------------------------------------
- ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
- ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
- name: Push Javadoc
run: ./push_javadoc.sh
env:
diff --git a/.github/workflows/gradle_snapshot.yml b/.github/workflows/gradle_snapshot.yml
index 5a66712f73..e56c9019aa 100644
--- a/.github/workflows/gradle_snapshot.yml
+++ b/.github/workflows/gradle_snapshot.yml
@@ -21,14 +21,14 @@ jobs:
# ------------------------------------------------------------------------------
CI_BUILD_NUMBER: ${{ github.run_number }}
steps:
- - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- - name: Set up JDK 8
- uses: actions/setup-java@c5195efecf7bdfc987ee8bae7a71cb8b11521c00 # v4.7.1
+ - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
+ - name: Set up JDK 11
+ uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5.2.0
with:
distribution: 'zulu'
- java-version: '8'
+ java-version: '11'
- name: Cache Gradle packages
- uses: actions/cache@5a3ec84eff668545956fd18022155c47e93e2684 # v4.2.3
+ uses: actions/cache@cdf6c1fa76f9f475f3d7449005a359c84ca0f306 # v5.0.3
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ secrets.CACHE_VERSION }}-${{ hashFiles('**/*.gradle') }}
@@ -40,14 +40,14 @@ jobs:
- name: Build RxJava
run: ./gradlew build --stacktrace --no-daemon
- name: Upload Snapshot
- run: ./gradlew -PreleaseMode=branch publish --no-daemon --no-parallel --stacktrace
+ run: ./gradlew -PreleaseMode=branch publishAllPublicationsToMavenCentralRepository --no-daemon --no-parallel --stacktrace
env:
# Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
# ------------------------------------------------------------------------------
ORG_GRADLE_PROJECT_mavenCentralUsername: ${{ secrets.SONATYPE_USER }}
ORG_GRADLE_PROJECT_mavenCentralPassword: ${{ secrets.SONATYPE_PASSWORD }}
- name: Upload to Codecov
- uses: codecov/codecov-action@b9fd7d16f6d7d1b5d2bec1a2887e65ceed900238 # v4.6.0
+ uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
- name: Push Javadoc
run: ./push_javadoc.sh
# Define secrets at https://github.com/ReactiveX/RxJava/settings/secrets/actions
diff --git a/.github/workflows/scorecard.yml b/.github/workflows/scorecard.yml
index 2fe557cec1..26a487cffe 100644
--- a/.github/workflows/scorecard.yml
+++ b/.github/workflows/scorecard.yml
@@ -24,12 +24,12 @@ jobs:
steps:
- name: "Checkout code"
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
+ uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: "Run analysis"
- uses: ossf/scorecard-action@f49aabe0b5af0936a0987cfb85d86b75731b0186 # v2.4.1
+ uses: ossf/scorecard-action@4eaacf0543bb3f2c246792bd56e8cdeffafb205a # v2.4.3
with:
results_file: results.sarif
results_format: sarif
@@ -46,7 +46,7 @@ jobs:
# Upload the results as artifacts (optional). Commenting out will disable uploads of run results in SARIF
# format to the repository Actions tab.
- name: "Upload artifact"
- uses: actions/upload-artifact@ea165f8d65b6e75b540449e92b4886f43607fa02 # v4.6.2
+ uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
with:
name: SARIF file
path: results.sarif
@@ -54,6 +54,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard.
- name: "Upload to code-scanning"
- uses: github/codeql-action/upload-sarif@45775bd8235c68ba998cffa5171334d58593da47 # v3.28.15
+ uses: github/codeql-action/upload-sarif@0d579ffd059c29b07949a3cce3983f0780820c98 # v3.29.5
with:
sarif_file: results.sarif
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 98ae7225f3..c1f0974f79 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -4,6 +4,22 @@ If you would like to contribute code you can do so through GitHub by forking the
When submitting code, please make every effort to follow existing conventions and style in order to keep the code as readable as possible.
+## AI contributions
+
+We are not against contributions from AI tools, LLM-based or future architectures. However, you as a human are responsible for its contributions and suggestions.
+
+This means, you have to make sure it doesn't hallucinate issues or elements of the contribution, doesn't try to hack rewards or hack established unit tests, doesn't go wild
+and rearchitect established components.
+
+If you post a contribution that is broken, we will not argue with your LLM or prompt engineer for you. You are responsible for having the the LLM's output work within the confines
+of this project.
+
+Please also be aware that this project is large both in current and historical sense with some rules not documented or enforced by unit tests. This is because such unwritten rules
+were trivial or readily inferrable by humans in the past. The project predates LLMs several years and thus is not organized to be accessible by LLMs today. Nor should it be.
+
+Consequently, the amount of prompting and the context window size to include all possible information about it could become so much that it can become prohibitively expensive to have
+an LLM come up with more than basic and trivial contributions. Needless to say, don't bankrupt yourself and don't just accept the LLM's output at face value.
+
## License
By contributing your code, you agree to license your contribution under the terms of the APLv2: https://github.com/ReactiveX/RxJava/blob/3.x/LICENSE
diff --git a/README.md b/README.md
index 1456d2b68c..5276c0bd37 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
[](https://codecov.io/gh/ReactiveX/RxJava/branch/3.x)
-[](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava)
+[](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava)
[](https://gitpod.io/#https://github.com/ReactiveX/RxJava)
[](https://securityscorecards.dev/viewer/?uri=github.com/ReactiveX/RxJava)
@@ -48,7 +48,7 @@ The first step is to include RxJava 3 into your project, for example, as a Gradl
implementation "io.reactivex.rxjava3:rxjava:3.x.y"
```
-(Please replace `x` and `y` with the latest version numbers: [](https://maven-badges.herokuapp.com/maven-central/io.reactivex.rxjava3/rxjava)
+(Please replace `x` and `y` with the latest version numbers: [](https://maven-badges.sml.io/sonatype-central/io.reactivex.rxjava3/rxjava)
)
### Hello World
@@ -510,7 +510,7 @@ For further details, consult the [wiki](https://github.com/ReactiveX/RxJava/wiki
- Google Group: [RxJava](http://groups.google.com/d/forum/rxjava)
- Twitter: [@RxJava](http://twitter.com/RxJava)
- [GitHub Issues](https://github.com/ReactiveX/RxJava/issues)
-- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java) and [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2)
+- StackOverflow: [rx-java](http://stackoverflow.com/questions/tagged/rx-java), [rx-java2](http://stackoverflow.com/questions/tagged/rx-java2) and [rx-java3](http://stackoverflow.com/questions/tagged/rx-java3)
- [Gitter.im](https://gitter.im/ReactiveX/RxJava)
## Versioning
@@ -571,11 +571,11 @@ and for Ivy:
### Snapshots
-Snapshots after May 1st, 2021 are available via https://oss.sonatype.org/content/repositories/snapshots/io/reactivex/rxjava3/rxjava/
+Snapshots after May 19st, 2025 are available via https://central.sonatype.com/repository/maven-snapshots/io/reactivex/rxjava3/rxjava/
```groovy
repositories {
- maven { url 'https://oss.sonatype.org/content/repositories/snapshots' }
+ maven { url 'https://central.sonatype.com/repository/maven-snapshots' }
}
dependencies {
@@ -583,7 +583,7 @@ dependencies {
}
```
-JavaDoc snapshots are available at http://reactivex.io/RxJava/3.x/javadoc/snapshot
+JavaDoc snapshots are available at https://reactivex.io/RxJava/3.x/javadoc/snapshot
## Build
diff --git a/build.gradle b/build.gradle
index 39703ec95e..8bcfddb717 100644
--- a/build.gradle
+++ b/build.gradle
@@ -4,12 +4,13 @@ plugins {
id("eclipse")
id("jacoco")
id("maven-publish")
- id("ru.vyarus.animalsniffer") version "2.0.0"
- id("me.champeau.gradle.jmh") version "0.5.3"
+ id("ru.vyarus.animalsniffer") version "2.0.1"
+ id("me.champeau.jmh") version "0.7.3"
id("com.github.hierynomus.license") version "0.16.1"
id("biz.aQute.bnd.builder") version "6.4.0"
- id("com.vanniktech.maven.publish") version "0.19.0"
- id("org.beryx.jar") version "1.2.0"
+ id("com.vanniktech.maven.publish") version "0.33.0"
+ id("org.beryx.jar") version "2.0.0"
+ id("signing")
}
ext {
@@ -18,7 +19,7 @@ ext {
testNgVersion = "7.5"
mockitoVersion = "4.11.0"
jmhLibVersion = "1.21"
- guavaVersion = "33.4.8-jre"
+ guavaVersion = "33.5.0-jre"
}
def releaseTag = System.getenv("BUILD_TAG")
@@ -49,7 +50,16 @@ dependencies {
testImplementation "com.google.guava:guava:$guavaVersion"
}
+def buildWith11 = System.getenv("BUILD_WITH_11")
java {
+ toolchain {
+ vendor = JvmVendorSpec.ADOPTIUM
+ if ("true".equals(buildWith11)) {
+ languageVersion = JavaLanguageVersion.of(11)
+ } else {
+ languageVersion = JavaLanguageVersion.of(8)
+ }
+ }
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
}
@@ -86,12 +96,19 @@ animalsniffer {
annotation = "io.reactivex.rxjava3.internal.util.SuppressAnimalSniffer"
}
+moduleConfig {
+ moduleInfoPath = 'src/main/module/module-info.java'
+ multiReleaseVersion = 9
+ version = project.version
+}
+
jar {
from('.') {
include 'LICENSE'
include 'COPYRIGHT'
into('META-INF/')
}
+ exclude("module-info.class")
// Cover for bnd still not supporting MR Jars: https://github.com/bndtools/bnd/issues/2227
bnd('-fixupmessages': '^Classes found in the wrong directory: \\\\{META-INF/versions/9/module-info\\\\.class=module-info}$')
@@ -106,8 +123,6 @@ jar {
"Bundle-SymbolicName": "io.reactivex.rxjava3.rxjava",
"Multi-Release": "true"
)
-
- moduleInfoPath = 'src/main/module/module-info.java'
}
license {
@@ -126,8 +141,8 @@ jmh {
jvmArgsAppend = ["-Djmh.separateClasspathJAR=true"]
if (project.hasProperty("jmh")) {
- include = [".*" + project.jmh + ".*"]
- logger.info("JMH: {}", include)
+ includes = [".*" + project.jmh + ".*"]
+ logger.info("JMH: {}", includes)
}
}
@@ -166,8 +181,9 @@ jacocoTestReport {
dependsOn testNG
reports {
- xml.enabled = true
- html.enabled = true
+ xml.required.set(true)
+ csv.required.set(false)
+ html.required.set(true)
}
}
@@ -179,44 +195,25 @@ checkstyle {
"checkstyle.suppressions.file": project.file("config/checkstyle/suppressions.xml"),
"checkstyle.header.file" : project.file("config/license/HEADER_JAVA")
]
+ checkstyleMain.exclude '**/module-info.java'
}
if (project.hasProperty("releaseMode")) {
logger.lifecycle("ReleaseMode: {}", project.releaseMode)
- /*
- if ("branch" == project.releaseMode) {
-
- if (version.endsWith("-SNAPSHOT")) {
- publishing {
- repositories {
- maven {
- url = "https://s01.oss.sonatype.org/content/repositories/snapshots/"
- }
- }
- }
-
- mavenPublish {
- nexus {
- stagingProfile = "io.reactivex"
- }
- }
- }
- }
- */
if ("full" == project.releaseMode) {
signing {
if (project.hasProperty("SIGNING_PRIVATE_KEY") && project.hasProperty("SIGNING_PASSWORD")) {
useInMemoryPgpKeys(project.getProperty("SIGNING_PRIVATE_KEY"), project.getProperty("SIGNING_PASSWORD"))
+ sign(publishing.publications)
}
}
- /*
- mavenPublish {
- nexus {
- stagingProfile = "io.reactivex"
- }
- }
- */
+ }
+ mavenPublishing {
+ // or when publishing to https://central.sonatype.com/
+ publishToMavenCentral(com.vanniktech.maven.publish.SonatypeHost.CENTRAL_PORTAL)
+
+ // signAllPublications()
}
}
diff --git a/docs/Additional-Reading.md b/docs/Additional-Reading.md
index 85e7d47077..4badd81308 100644
--- a/docs/Additional-Reading.md
+++ b/docs/Additional-Reading.md
@@ -3,7 +3,7 @@ A more complete and up-to-date list of resources can be found at the [reactivex.
# Introducing Reactive Programming
* [Introduction to Rx](http://www.introtorx.com/): a free, on-line book by Lee Campbell **(1.x)**
* [The introduction to Reactive Programming you've been missing](https://gist.github.com/staltz/868e7e9bc2a7b8c1f754) by Andre Staltz
-* [Mastering Observables](http://docs.couchbase.com/developer/java-2.0/observables.html) from the Couchbase documentation **(1.x)**
+* [Mastering Observables](https://docs.huihoo.com/couchbase/developer-guide/java-2.0/observables.html) from the Couchbase documentation **(1.x)**
* [Reactive Programming in Java 8 With RxJava](http://pluralsight.com/training/Courses/TableOfContents/reactive-programming-java-8-rxjava), a course designed by Russell Elledge **(1.x)**
* [33rd Degree Reactive Java](http://www.slideshare.net/tkowalcz/33rd-degree-reactive-java) by Tomasz Kowalczewski **(1.x)**
* [What Every Hipster Should Know About Functional Reactive Programming](http://www.infoq.com/presentations/game-functional-reactive-programming) - Bodil Stokke demos the creation of interactive game mechanics in RxJS
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index c7d437bbb4..3c44eb1b6f 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.4-bin.zip
+distributionUrl=https\://services.gradle.org/distributions/gradle-8.14-bin.zip
networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
diff --git a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
index c4a0a385a5..6e6ae7e3c6 100644
--- a/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/core/BinaryFlatMapPerf.java
@@ -139,9 +139,9 @@ public Observable extends Integer> apply(Integer v) {
}
});
- singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function>() {
+ singleFlatMapHideObservable = Single.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return arrayObservableHide;
}
});
@@ -153,16 +153,16 @@ public Iterable extends Integer> apply(Integer v) {
}
});
- maybeFlatMapObservable = Maybe.just(1).flatMapObservable(new Function>() {
+ maybeFlatMapObservable = Maybe.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return arrayObservable;
}
});
- maybeFlatMapHideObservable = Maybe.just(1).flatMapObservable(new Function>() {
+ maybeFlatMapHideObservable = Maybe.just(1).flatMapObservable(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return arrayObservableHide;
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java
index 87ee5a07e4..4310ea2e95 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableConcatMapMaybePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.concatMapMaybe(new Function>() {
+ flowableDedicated = source.concatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java
index 8ab19a00c6..699f76c074 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybeEmptyPerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.flatMapMaybe(new Function>() {
+ flowableDedicated = source.flatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java
index d0f3730b42..f81ed10ec3 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapMaybePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.flatMapMaybe(new Function>() {
+ flowableDedicated = source.flatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java
index 4f50938647..5a92bf20ff 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableFlatMapSinglePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.flatMapSingle(new Function>() {
+ flowableDedicated = source.flatMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java
index 83ad00e0f9..46ce694f6d 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybeEmptyPerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.switchMapMaybe(new Function>() {
+ flowableDedicated = source.switchMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java
index e36b49c4d3..e96bbc3919 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapMaybePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.switchMapMaybe(new Function>() {
+ flowableDedicated = source.switchMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java
index 0da6941895..ef06ebfa66 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/FlowableSwitchMapSinglePerf.java
@@ -60,9 +60,9 @@ public Publisher extends Integer> apply(Integer v) {
}
});
- flowableDedicated = source.switchMapSingle(new Function>() {
+ flowableDedicated = source.switchMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java
index 48b20dc005..2229eed77a 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapCompletablePerf.java
@@ -45,16 +45,16 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.concatMap(new Function>() {
+ observablePlain = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.concatMap(new Function>() {
+ observableConvert = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Completable.complete().toObservable();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java
index 4528c90b50..cfde5183e5 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybeEmptyPerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.concatMap(new Function>() {
+ observablePlain = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- concatMapToObservableEmpty = source.concatMap(new Function>() {
+ concatMapToObservableEmpty = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.empty().toObservable();
}
});
- observableDedicated = source.concatMapMaybe(new Function>() {
+ observableDedicated = source.concatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java
index 204020abfe..75e7506724 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapMaybePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.concatMap(new Function>() {
+ observablePlain = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.concatMap(new Function>() {
+ observableConvert = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.just(v).toObservable();
}
});
- observableDedicated = source.concatMapMaybe(new Function>() {
+ observableDedicated = source.concatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java
index e2e34b24f5..4227791222 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableConcatMapSinglePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.concatMap(new Function>() {
+ observablePlain = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.concatMap(new Function>() {
+ observableConvert = source.concatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Single.just(v).toObservable();
}
});
- observableDedicated = source.concatMapSingle(new Function>() {
+ observableDedicated = source.concatMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java
index b6daa57eb6..6a916a68f1 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapCompletablePerf.java
@@ -45,16 +45,16 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.flatMap(new Function>() {
+ observablePlain = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.flatMap(new Function>() {
+ observableConvert = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Completable.complete().toObservable();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java
index 5d0327fa46..377a8bba93 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybeEmptyPerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.flatMap(new Function>() {
+ observablePlain = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.flatMap(new Function>() {
+ observableConvert = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.empty().toObservable();
}
});
- observableDedicated = source.flatMapMaybe(new Function>() {
+ observableDedicated = source.flatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java
index e2a7c43bea..248ca98112 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapMaybePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.flatMap(new Function>() {
+ observablePlain = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.flatMap(new Function>() {
+ observableConvert = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.just(v).toObservable();
}
});
- observableDedicated = source.flatMapMaybe(new Function>() {
+ observableDedicated = source.flatMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java
index add0cd310c..880da95f5a 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableFlatMapSinglePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.flatMap(new Function>() {
+ observablePlain = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.flatMap(new Function>() {
+ observableConvert = source.flatMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Single.just(v).toObservable();
}
});
- observableDedicated = source.flatMapSingle(new Function>() {
+ observableDedicated = source.flatMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java
index 69b8e71f18..41964c3dbd 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapCompletablePerf.java
@@ -45,16 +45,16 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.switchMap(new Function>() {
+ observablePlain = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.switchMap(new Function>() {
+ observableConvert = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Completable.complete().toObservable();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java
index 3930534eb8..6a4ea5c73b 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybeEmptyPerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.switchMap(new Function>() {
+ observablePlain = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.empty();
}
});
- observableConvert = source.switchMap(new Function>() {
+ observableConvert = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.empty().toObservable();
}
});
- observableDedicated = source.switchMapMaybe(new Function>() {
+ observableDedicated = source.switchMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.empty();
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java
index 30158d012d..f0c3285890 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapMaybePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.switchMap(new Function>() {
+ observablePlain = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.switchMap(new Function>() {
+ observableConvert = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Maybe.just(v).toObservable();
}
});
- observableDedicated = source.switchMapMaybe(new Function>() {
+ observableDedicated = source.switchMapMaybe(new Function>() {
@Override
- public Maybe extends Integer> apply(Integer v) {
+ public Maybe apply(Integer v) {
return Maybe.just(v);
}
});
diff --git a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java
index 75aeb504f9..087f32c8e3 100644
--- a/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java
+++ b/src/jmh/java/io/reactivex/rxjava3/xmapz/ObservableSwitchMapSinglePerf.java
@@ -45,23 +45,23 @@ public void setup() {
Observable source = Observable.fromArray(sourceArray);
- observablePlain = source.switchMap(new Function>() {
+ observablePlain = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Observable.just(v);
}
});
- observableConvert = source.switchMap(new Function>() {
+ observableConvert = source.switchMap(new Function>() {
@Override
- public Observable extends Integer> apply(Integer v) {
+ public Observable apply(Integer v) {
return Single.just(v).toObservable();
}
});
- observableDedicated = source.switchMapSingle(new Function>() {
+ observableDedicated = source.switchMapSingle(new Function>() {
@Override
- public Single extends Integer> apply(Integer v) {
+ public Single apply(Integer v) {
return Single.just(v);
}
});
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java
index 469d0dd48b..a7de73213a 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDelay.java
@@ -111,7 +111,9 @@ final class OnNext implements Runnable {
@Override
public void run() {
- downstream.onNext(t);
+ if (!w.isDisposed()) {
+ downstream.onNext(t);
+ }
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java
index daa9edd533..99e11259a6 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCache.java
@@ -24,23 +24,7 @@
*
* @param the source element type
*/
-public final class ObservableCache extends AbstractObservableWithUpstream
-implements Observer {
-
- /**
- * The subscription to the source should happen at most once.
- */
- final AtomicBoolean once;
-
- /**
- * The number of items per cached nodes.
- */
- final int capacityHint;
-
- /**
- * The current known array of observer state to notify.
- */
- final AtomicReference[]> observers;
+public final class ObservableCache extends AbstractObservableWithUpstream {
/**
* A shared instance of an empty array of observers to avoid creating
@@ -56,61 +40,49 @@ public final class ObservableCache extends AbstractObservableWithUpstream head;
-
- /**
- * The current tail of the linked structure holding the items.
- */
- Node tail;
-
- /**
- * How many items have been put into the tail node so far.
+ * The subscription to the source should happen at most once.
*/
- int tailOffset;
+ final AtomicBoolean once;
/**
- * If {@link #observers} is {@link #TERMINATED}, this holds the terminal error if not null.
+ * Responsible caching events from the source and multicasting them to each downstream.
*/
- Throwable error;
+ final Multicaster multicaster;
/**
- * True if the source has terminated.
+ * The first node in a singly linked list. Each node has the capacity to hold a specific number of events, and each
+ * points exclusively to the next node (if present). When a new downstream arrives, the subscription is
+ * initialized with a reference to the "head" node, and any events present in the linked list are replayed. As
+ * events are replayed to the new downstream, its 'node' reference advances through the linked list, discarding each
+ * node reference once all events in that node have been replayed. Consequently, once {@code this} instance goes out
+ * of scope, the prefix of nodes up to the first node that is still being replayed becomes unreachable and eligible
+ * for collection.
*/
- volatile boolean done;
+ final Node head;
/**
* Constructs an empty, non-connected cache.
* @param source the source to subscribe to for the first incoming observer
* @param capacityHint the number of items expected (reduce allocation frequency)
*/
- @SuppressWarnings("unchecked")
public ObservableCache(Observable source, int capacityHint) {
super(source);
- this.capacityHint = capacityHint;
this.once = new AtomicBoolean();
Node n = new Node<>(capacityHint);
this.head = n;
- this.tail = n;
- this.observers = new AtomicReference<>(EMPTY);
+ this.multicaster = new Multicaster<>(capacityHint, n);
}
@Override
protected void subscribeActual(Observer super T> t) {
- CacheDisposable consumer = new CacheDisposable<>(t, this);
+ CacheDisposable consumer = new CacheDisposable<>(t, multicaster, head);
t.onSubscribe(consumer);
- add(consumer);
+ multicaster.add(consumer);
if (!once.get() && once.compareAndSet(false, true)) {
- source.subscribe(this);
+ source.subscribe(multicaster);
} else {
- replay(consumer);
+ multicaster.replay(consumer);
}
}
@@ -127,7 +99,7 @@ protected void subscribeActual(Observer super T> t) {
* @return true if the cache has observers
*/
/* public */ boolean hasObservers() {
- return observers.get().length != 0;
+ return multicaster.get().length != 0;
}
/**
@@ -135,194 +107,241 @@ protected void subscribeActual(Observer super T> t) {
* @return the number of currently cached event count
*/
/* public */ long cachedEventCount() {
- return size;
+ return multicaster.size;
}
- /**
- * Atomically adds the consumer to the {@link #observers} copy-on-write array
- * if the source has not yet terminated.
- * @param consumer the consumer to add
- */
- void add(CacheDisposable consumer) {
- for (;;) {
- CacheDisposable[] current = observers.get();
- if (current == TERMINATED) {
- return;
- }
- int n = current.length;
+ static final class Multicaster extends AtomicReference[]> implements Observer {
- @SuppressWarnings("unchecked")
- CacheDisposable[] next = new CacheDisposable[n + 1];
- System.arraycopy(current, 0, next, 0, n);
- next[n] = consumer;
+ /** */
+ private static final long serialVersionUID = 8514643269016498691L;
- if (observers.compareAndSet(current, next)) {
- return;
- }
- }
- }
+ /**
+ * The number of items per cached nodes.
+ */
+ final int capacityHint;
- /**
- * Atomically removes the consumer from the {@link #observers} copy-on-write array.
- * @param consumer the consumer to remove
- */
- @SuppressWarnings("unchecked")
- void remove(CacheDisposable consumer) {
- for (;;) {
- CacheDisposable[] current = observers.get();
- int n = current.length;
- if (n == 0) {
- return;
- }
+ /**
+ * The total number of elements in the list available for reads.
+ */
+ volatile long size;
- int j = -1;
- for (int i = 0; i < n; i++) {
- if (current[i] == consumer) {
- j = i;
- break;
- }
- }
+ /**
+ * The current tail of the linked structure holding the items.
+ */
+ Node tail;
- if (j < 0) {
- return;
- }
- CacheDisposable[] next;
+ /**
+ * How many items have been put into the tail node so far.
+ */
+ int tailOffset;
- if (n == 1) {
- next = EMPTY;
- } else {
- next = new CacheDisposable[n - 1];
- System.arraycopy(current, 0, next, 0, j);
- System.arraycopy(current, j + 1, next, j, n - j - 1);
- }
+ /**
+ * If the observers are {@link #TERMINATED}, this holds the terminal error if not null.
+ */
+ Throwable error;
- if (observers.compareAndSet(current, next)) {
- return;
- }
- }
- }
+ /**
+ * True if the source has terminated.
+ */
+ volatile boolean done;
- /**
- * Replays the contents of this cache to the given consumer based on its
- * current state and number of items requested by it.
- * @param consumer the consumer to continue replaying items to
- */
- void replay(CacheDisposable consumer) {
- // make sure there is only one replay going on at a time
- if (consumer.getAndIncrement() != 0) {
- return;
+ @SuppressWarnings("unchecked")
+ Multicaster(int capacityHint, final Node head) {
+ super(EMPTY);
+ this.tail = head;
+ this.capacityHint = capacityHint;
}
- // see if there were more replay request in the meantime
- int missed = 1;
- // read out state into locals upfront to avoid being re-read due to volatile reads
- long index = consumer.index;
- int offset = consumer.offset;
- Node node = consumer.node;
- Observer super T> downstream = consumer.downstream;
- int capacity = capacityHint;
-
- for (;;) {
- // if the consumer got disposed, clear the node and quit
- if (consumer.disposed) {
- consumer.node = null;
- return;
+ /**
+ * Atomically adds the consumer to the observers copy-on-write array
+ * if the source has not yet terminated.
+ * @param consumer the consumer to add
+ */
+ void add(CacheDisposable consumer) {
+ for (;;) {
+ CacheDisposable[] current = get();
+ if (current == TERMINATED) {
+ return;
+ }
+ int n = current.length;
+
+ @SuppressWarnings("unchecked")
+ CacheDisposable[] next = new CacheDisposable[n + 1];
+ System.arraycopy(current, 0, next, 0, n);
+ next[n] = consumer;
+
+ if (compareAndSet(current, next)) {
+ return;
+ }
}
+ }
- // first see if the source has terminated, read order matters!
- boolean sourceDone = done;
- // and if the number of items is the same as this consumer has received
- boolean empty = size == index;
-
- // if the source is done and we have all items so far, terminate the consumer
- if (sourceDone && empty) {
- // release the node object to avoid leaks through retained consumers
- consumer.node = null;
- // if error is not null then the source failed
- Throwable ex = error;
- if (ex != null) {
- downstream.onError(ex);
+ /**
+ * Atomically removes the consumer from the observers copy-on-write array.
+ * @param consumer the consumer to remove
+ */
+ @SuppressWarnings("unchecked")
+ void remove(CacheDisposable consumer) {
+ for (;;) {
+ CacheDisposable[] current = get();
+ int n = current.length;
+ if (n == 0) {
+ return;
+ }
+
+ int j = -1;
+ for (int i = 0; i < n; i++) {
+ if (current[i] == consumer) {
+ j = i;
+ break;
+ }
+ }
+
+ if (j < 0) {
+ return;
+ }
+ CacheDisposable[] next;
+
+ if (n == 1) {
+ next = EMPTY;
} else {
- downstream.onComplete();
+ next = new CacheDisposable[n - 1];
+ System.arraycopy(current, 0, next, 0, j);
+ System.arraycopy(current, j + 1, next, j, n - j - 1);
}
+
+ if (compareAndSet(current, next)) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Replays the contents of this cache to the given consumer based on its
+ * current state and number of items requested by it.
+ * @param consumer the consumer to continue replaying items to
+ */
+ void replay(CacheDisposable consumer) {
+ // make sure there is only one replay going on at a time
+ if (consumer.getAndIncrement() != 0) {
return;
}
- // there are still items not sent to the consumer
- if (!empty) {
- // if the offset in the current node has reached the node capacity
- if (offset == capacity) {
- // switch to the subsequent node
- node = node.next;
- // reset the in-node offset
- offset = 0;
+ // see if there were more replay request in the meantime
+ int missed = 1;
+ // read out state into locals upfront to avoid being re-read due to volatile reads
+ long index = consumer.index;
+ int offset = consumer.offset;
+ Node node = consumer.node;
+ Observer super T> downstream = consumer.downstream;
+ int capacity = capacityHint;
+
+ for (;;) {
+ // if the consumer got disposed, clear the node and quit
+ if (consumer.disposed) {
+ consumer.node = null;
+ return;
}
- // emit the cached item
- downstream.onNext(node.values[offset]);
-
- // move the node offset forward
- offset++;
- // move the total consumed item count forward
- index++;
+ // first see if the source has terminated, read order matters!
+ boolean sourceDone = done;
+ // and if the number of items is the same as this consumer has received
+ boolean empty = size == index;
+
+ // if the source is done and we have all items so far, terminate the consumer
+ if (sourceDone && empty) {
+ // release the node object to avoid leaks through retained consumers
+ consumer.node = null;
+ // if error is not null then the source failed
+ Throwable ex = error;
+ if (ex != null) {
+ downstream.onError(ex);
+ } else {
+ downstream.onComplete();
+ }
+ return;
+ }
- // retry for the next item/terminal event if any
- continue;
- }
+ // there are still items not sent to the consumer
+ if (!empty) {
+ // if the offset in the current node has reached the node capacity
+ if (offset == capacity) {
+ // switch to the subsequent node
+ node = node.next;
+ // reset the in-node offset
+ offset = 0;
+ }
+
+ // emit the cached item
+ downstream.onNext(node.values[offset]);
+
+ // move the node offset forward
+ offset++;
+ // move the total consumed item count forward
+ index++;
+
+ // retry for the next item/terminal event if any
+ continue;
+ }
- // commit the changed references back
- consumer.index = index;
- consumer.offset = offset;
- consumer.node = node;
- // release the changes and see if there were more replay request in the meantime
- missed = consumer.addAndGet(-missed);
- if (missed == 0) {
- break;
+ // commit the changed references back
+ consumer.index = index;
+ consumer.offset = offset;
+ consumer.node = node;
+ // release the changes and see if there were more replay request in the meantime
+ missed = consumer.addAndGet(-missed);
+ if (missed == 0) {
+ break;
+ }
}
}
- }
- @Override
- public void onSubscribe(Disposable d) {
- // we can't do much with the upstream disposable
- }
-
- @Override
- public void onNext(T t) {
- int tailOffset = this.tailOffset;
- // if the current tail node is full, create a fresh node
- if (tailOffset == capacityHint) {
- Node n = new Node<>(tailOffset);
- n.values[0] = t;
- this.tailOffset = 1;
- tail.next = n;
- tail = n;
- } else {
- tail.values[tailOffset] = t;
- this.tailOffset = tailOffset + 1;
+ @Override
+ public void onSubscribe(Disposable d) {
+ // we can't do much with the upstream disposable
}
- size++;
- for (CacheDisposable consumer : observers.get()) {
- replay(consumer);
+
+ @Override
+ public void onNext(T t) {
+ int tailOffset = this.tailOffset;
+ // if the current tail node is full, create a fresh node
+ if (tailOffset == capacityHint) {
+ Node n = new Node<>(tailOffset);
+ n.values[0] = t;
+ this.tailOffset = 1;
+ tail.next = n;
+ tail = n;
+ } else {
+ tail.values[tailOffset] = t;
+ this.tailOffset = tailOffset + 1;
+ }
+ size++;
+ for (CacheDisposable consumer : get()) {
+ replay(consumer);
+ }
}
- }
- @SuppressWarnings("unchecked")
- @Override
- public void onError(Throwable t) {
- error = t;
- done = true;
- for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) {
- replay(consumer);
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onError(Throwable t) {
+ error = t;
+ done = true;
+ // No additional events will arrive, so now we can clear the 'tail' reference
+ tail = null;
+ for (CacheDisposable consumer : getAndSet(TERMINATED)) {
+ replay(consumer);
+ }
}
- }
- @SuppressWarnings("unchecked")
- @Override
- public void onComplete() {
- done = true;
- for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) {
- replay(consumer);
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onComplete() {
+ done = true;
+ // No additional events will arrive, so now we can clear the 'tail' reference
+ tail = null;
+ for (CacheDisposable consumer : getAndSet(TERMINATED)) {
+ replay(consumer);
+ }
}
}
@@ -338,7 +357,7 @@ static final class CacheDisposable extends AtomicInteger
final Observer super T> downstream;
- final ObservableCache parent;
+ final Multicaster parent;
Node node;
@@ -353,11 +372,12 @@ static final class CacheDisposable extends AtomicInteger
* the parent cache object.
* @param downstream the actual consumer
* @param parent the parent that holds onto the cached items
+ * @param head the first node in the linked list
*/
- CacheDisposable(Observer super T> downstream, ObservableCache parent) {
+ CacheDisposable(Observer super T> downstream, Multicaster parent, Node head) {
this.downstream = downstream;
this.parent = parent;
- this.node = parent.head;
+ this.node = head;
}
@Override
diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java
index 1801cce1f2..7c01c23f90 100644
--- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java
+++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableDelay.java
@@ -111,7 +111,9 @@ final class OnNext implements Runnable {
@Override
public void run() {
- downstream.onNext(t);
+ if (!w.isDisposed()) {
+ downstream.onNext(t);
+ }
}
}
diff --git a/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java b/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java
index 4d5eb3335f..e103594ba5 100644
--- a/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java
+++ b/src/main/java/io/reactivex/rxjava3/subjects/ReplaySubject.java
@@ -652,8 +652,6 @@ static final class UnboundedReplayBuffer
final List