Skip to content

Commit 84be8cc

Browse files
committed
Add push command
// FREEBIE
1 parent 2a7e2be commit 84be8cc

4 files changed

Lines changed: 189 additions & 1 deletion

File tree

src/main/java/org/whispersystems/textsecuregcm/WhisperServerService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@
8989
import org.whispersystems.textsecuregcm.workers.DeleteUserCommand;
9090
import org.whispersystems.textsecuregcm.workers.DirectoryCommand;
9191
import org.whispersystems.textsecuregcm.workers.PeriodicStatsCommand;
92+
import org.whispersystems.textsecuregcm.workers.PushCommand;
9293
import org.whispersystems.textsecuregcm.workers.TrimMessagesCommand;
9394
import org.whispersystems.textsecuregcm.workers.VacuumCommand;
9495
import org.whispersystems.websocket.WebSocketResourceProviderFactory;
@@ -123,6 +124,7 @@ public void initialize(Bootstrap<WhisperServerConfiguration> bootstrap) {
123124
bootstrap.addCommand(new TrimMessagesCommand());
124125
bootstrap.addCommand(new PeriodicStatsCommand());
125126
bootstrap.addCommand(new DeleteUserCommand());
127+
bootstrap.addCommand(new PushCommand());
126128
bootstrap.addBundle(new NameableMigrationsBundle<WhisperServerConfiguration>("accountdb", "accountsdb.xml") {
127129
@Override
128130
public DataSourceFactory getDataSourceFactory(WhisperServerConfiguration configuration) {

src/main/java/org/whispersystems/textsecuregcm/push/PushSender.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class PushSender implements Managed {
4141

4242
private final Logger logger = LoggerFactory.getLogger(PushSender.class);
4343

44-
private static final String APN_PAYLOAD = "{\"aps\":{\"sound\":\"default\",\"badge\":%d,\"alert\":{\"loc-key\":\"APN_Message\"}}}";
44+
public static final String APN_PAYLOAD = "{\"aps\":{\"sound\":\"default\",\"badge\":%d,\"alert\":{\"loc-key\":\"APN_Message\"}}}";
4545

4646
private final ApnFallbackManager apnFallbackManager;
4747
private final PushServiceClient pushServiceClient;

src/main/java/org/whispersystems/textsecuregcm/storage/Messages.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.skife.jdbi.v2.tweak.ResultSetMapper;
1313
import org.whispersystems.textsecuregcm.entities.MessageProtos.Envelope;
1414
import org.whispersystems.textsecuregcm.entities.OutgoingMessageEntity;
15+
import org.whispersystems.textsecuregcm.util.Pair;
1516

1617
import java.lang.annotation.Annotation;
1718
import java.lang.annotation.ElementType;
@@ -56,6 +57,10 @@ abstract OutgoingMessageEntity remove(@Bind("destination") String destina
5657
@Bind("source") String source,
5758
@Bind("timestamp") long timestamp);
5859

60+
@Mapper(DestinationMapper.class)
61+
@SqlQuery("SELECT DISTINCT ON (destination, destination_device) destination, destination_device FROM messages WHERE timestamp > :timestamp ORDER BY destination, destination_device OFFSET :offset LIMIT :limit")
62+
public abstract List<Pair<String, Integer>> getPendingDestinations(@Bind("timestamp") long sinceTimestamp, @Bind("offset") int offset, @Bind("limit") int limit);
63+
5964
@Mapper(MessageMapper.class)
6065
@SqlUpdate("DELETE FROM messages WHERE " + ID + " = :id AND " + DESTINATION + " = :destination")
6166
abstract void remove(@Bind("destination") String destination, @Bind("id") long id);
@@ -72,6 +77,14 @@ abstract OutgoingMessageEntity remove(@Bind("destination") String destina
7277
@SqlUpdate("VACUUM messages")
7378
public abstract void vacuum();
7479

80+
public static class DestinationMapper implements ResultSetMapper<Pair<String, Integer>> {
81+
82+
@Override
83+
public Pair<String, Integer> map(int i, ResultSet resultSet, StatementContext statementContext) throws SQLException {
84+
return new Pair<>(resultSet.getString(DESTINATION), resultSet.getInt(DESTINATION_DEVICE));
85+
}
86+
}
87+
7588
public static class MessageMapper implements ResultSetMapper<OutgoingMessageEntity> {
7689
@Override
7790
public OutgoingMessageEntity map(int i, ResultSet resultSet, StatementContext statementContext)
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package org.whispersystems.textsecuregcm.workers;
2+
3+
import com.fasterxml.jackson.databind.DeserializationFeature;
4+
import com.google.common.base.Optional;
5+
import net.sourceforge.argparse4j.inf.Namespace;
6+
import net.sourceforge.argparse4j.inf.Subparser;
7+
import org.glassfish.jersey.client.ClientProperties;
8+
import org.skife.jdbi.v2.DBI;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
import org.whispersystems.textsecuregcm.WhisperServerConfiguration;
12+
import org.whispersystems.textsecuregcm.entities.ApnMessage;
13+
import org.whispersystems.textsecuregcm.entities.GcmMessage;
14+
import org.whispersystems.textsecuregcm.providers.RedisClientFactory;
15+
import org.whispersystems.textsecuregcm.push.ApnFallbackManager;
16+
import org.whispersystems.textsecuregcm.push.PushSender;
17+
import org.whispersystems.textsecuregcm.push.PushServiceClient;
18+
import org.whispersystems.textsecuregcm.push.TransientPushFailureException;
19+
import org.whispersystems.textsecuregcm.storage.Account;
20+
import org.whispersystems.textsecuregcm.storage.Accounts;
21+
import org.whispersystems.textsecuregcm.storage.AccountsManager;
22+
import org.whispersystems.textsecuregcm.storage.Device;
23+
import org.whispersystems.textsecuregcm.storage.DirectoryManager;
24+
import org.whispersystems.textsecuregcm.storage.Messages;
25+
import org.whispersystems.textsecuregcm.util.Pair;
26+
import org.whispersystems.textsecuregcm.util.Util;
27+
28+
import javax.ws.rs.client.Client;
29+
import java.util.List;
30+
import java.util.concurrent.TimeUnit;
31+
32+
import io.dropwizard.Application;
33+
import io.dropwizard.cli.EnvironmentCommand;
34+
import io.dropwizard.client.JerseyClientBuilder;
35+
import io.dropwizard.jdbi.DBIFactory;
36+
import io.dropwizard.setup.Environment;
37+
import redis.clients.jedis.JedisPool;
38+
39+
public class PushCommand extends EnvironmentCommand<WhisperServerConfiguration> {
40+
41+
private final Logger logger = LoggerFactory.getLogger(DirectoryCommand.class);
42+
43+
private static final int LIMIT = 1000;
44+
45+
public PushCommand() {
46+
super(new Application<WhisperServerConfiguration>() {
47+
@Override
48+
public void run(WhisperServerConfiguration configuration, Environment environment)
49+
throws Exception
50+
{
51+
52+
}
53+
}, "push", "send pushes");
54+
}
55+
56+
@Override
57+
public void configure(Subparser subparser) {
58+
super.configure(subparser);
59+
subparser.addArgument("-t", "--time")
60+
.dest("timestamp")
61+
.type(Long.class)
62+
.required(true)
63+
.help("The starting timestamp to notify users from");
64+
65+
subparser.addArgument("-o", "--offset")
66+
.dest("offset")
67+
.type(Integer.class)
68+
.required(true)
69+
.help("The starting offset in the user query");
70+
}
71+
72+
@Override
73+
protected void run(Environment environment, Namespace namespace,
74+
WhisperServerConfiguration configuration)
75+
throws Exception
76+
{
77+
try {
78+
long timestampStart = namespace.getLong("timestamp");
79+
int offset = namespace.getInt("offset");
80+
81+
environment.getObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
82+
83+
DBIFactory dbiFactory = new DBIFactory();
84+
DBI database = dbiFactory.build(environment, configuration.getDataSourceFactory(), "accountdb" );
85+
DBI messagedb = dbiFactory.build(environment, configuration.getMessageStoreConfiguration(), "messagedb");
86+
87+
Accounts accounts = database.onDemand(Accounts.class);
88+
Messages messages = messagedb.onDemand(Messages.class);
89+
90+
JedisPool cacheClient = new RedisClientFactory(configuration.getCacheConfiguration().getUrl()).getRedisClientPool();
91+
JedisPool redisClient = new RedisClientFactory(configuration.getDirectoryConfiguration().getUrl()).getRedisClientPool();
92+
DirectoryManager directory = new DirectoryManager(redisClient);
93+
AccountsManager accountsManager = new AccountsManager(accounts, directory, cacheClient);
94+
95+
Client httpClient = initializeHttpClient(environment, configuration);
96+
PushServiceClient pushServiceClient = new PushServiceClient(httpClient, configuration.getPushConfiguration());
97+
98+
while (true) {
99+
List<Pair<String, Integer>> pendingDestinations = messages.getPendingDestinations(timestampStart, offset, LIMIT);
100+
101+
if (pendingDestinations == null || pendingDestinations.size() == 0) {
102+
break;
103+
}
104+
105+
for (Pair<String, Integer> pendingDestination : pendingDestinations) {
106+
Optional<Account> account = accountsManager.get(pendingDestination.first());
107+
108+
if (account.isPresent()) {
109+
Optional<Device> device = account.get().getDevice(pendingDestination.second());
110+
111+
if (device.isPresent()) {
112+
if (device.get().getGcmId() != null) {
113+
sendGcm(pushServiceClient, account.get(), device.get());
114+
} else if (device.get().getApnId() != null) {
115+
sendApn(pushServiceClient, account.get(), device.get());
116+
}
117+
} else {
118+
logger.warn("No device found: " + pendingDestination.first() + ", " + pendingDestination.second());
119+
}
120+
} else {
121+
logger.warn("No account found: " + pendingDestination.first());
122+
}
123+
}
124+
125+
logger.warn("Processed " + LIMIT + "...");
126+
offset += LIMIT;
127+
}
128+
129+
logger.warn("Finished!");
130+
131+
} catch (Exception ex) {
132+
logger.warn("Exception", ex);
133+
}
134+
}
135+
136+
private void sendGcm(PushServiceClient pushServiceClient, Account account, Device device) {
137+
try {
138+
GcmMessage gcmMessage = new GcmMessage(device.getGcmId(), account.getNumber(),
139+
(int)device.getId(), "", false, true);
140+
141+
logger.warn("Sending GCM: " + account.getNumber());
142+
pushServiceClient.send(gcmMessage);
143+
} catch (TransientPushFailureException e) {
144+
logger.warn("Push failure", e);
145+
}
146+
}
147+
148+
private void sendApn(PushServiceClient pushServiceClient, Account account, Device device) {
149+
if (!Util.isEmpty(device.getVoipApnId())) {
150+
try {
151+
ApnMessage apnMessage = new ApnMessage(device.getVoipApnId(), account.getNumber(), (int)device.getId(),
152+
String.format(PushSender.APN_PAYLOAD, 1),
153+
true, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(ApnFallbackManager.FALLBACK_DURATION));
154+
155+
logger.warn("Sending APN: " + account.getNumber());
156+
pushServiceClient.send(apnMessage);
157+
} catch (TransientPushFailureException e) {
158+
logger.warn("SILENT PUSH LOSS", e);
159+
}
160+
}
161+
}
162+
163+
private Client initializeHttpClient(Environment environment, WhisperServerConfiguration config) {
164+
Client httpClient = new JerseyClientBuilder(environment).using(config.getJerseyClientConfiguration())
165+
.build(getName());
166+
167+
httpClient.property(ClientProperties.CONNECT_TIMEOUT, 1000);
168+
httpClient.property(ClientProperties.READ_TIMEOUT, 1000);
169+
170+
return httpClient;
171+
}
172+
173+
}

0 commit comments

Comments
 (0)