basic stream managment functionality

This commit is contained in:
Daniel Gultsch 2014-03-10 19:22:13 +01:00
parent e441005c87
commit 08023210ba
19 changed files with 303 additions and 123 deletions

View file

@ -20,7 +20,7 @@ import android.util.Log;
import eu.siacs.conversations.entities.Account;
import eu.siacs.conversations.persistance.DatabaseBackend;
import eu.siacs.conversations.xml.Element;
import eu.siacs.conversations.xmpp.MessagePacket;
import eu.siacs.conversations.xmpp.stanzas.MessagePacket;
import net.java.otr4j.OtrEngineHost;
import net.java.otr4j.OtrException;

View file

@ -5,7 +5,7 @@ import java.util.List;
import eu.siacs.conversations.entities.MucOptions.User;
import eu.siacs.conversations.xml.Element;
import eu.siacs.conversations.xmpp.PresencePacket;
import eu.siacs.conversations.xmpp.stanzas.PresencePacket;
import android.annotation.SuppressLint;
import android.util.Log;

View file

@ -35,15 +35,15 @@ import eu.siacs.conversations.utils.OnPhoneContactsLoadedListener;
import eu.siacs.conversations.utils.PhoneHelper;
import eu.siacs.conversations.utils.UIHelper;
import eu.siacs.conversations.xml.Element;
import eu.siacs.conversations.xmpp.IqPacket;
import eu.siacs.conversations.xmpp.MessagePacket;
import eu.siacs.conversations.xmpp.OnIqPacketReceived;
import eu.siacs.conversations.xmpp.OnMessagePacketReceived;
import eu.siacs.conversations.xmpp.OnPresencePacketReceived;
import eu.siacs.conversations.xmpp.OnStatusChanged;
import eu.siacs.conversations.xmpp.OnTLSExceptionReceived;
import eu.siacs.conversations.xmpp.PresencePacket;
import eu.siacs.conversations.xmpp.XmppConnection;
import eu.siacs.conversations.xmpp.stanzas.IqPacket;
import eu.siacs.conversations.xmpp.stanzas.MessagePacket;
import eu.siacs.conversations.xmpp.stanzas.PresencePacket;
import android.app.AlarmManager;
import android.app.PendingIntent;
import android.app.Service;
@ -126,7 +126,9 @@ public class XmppConnectionService extends Service {
|| (packet.hasChild("sent"))) {
message = MessageParser.parseCarbonMessage(packet, account,
service);
message.getConversation().markRead();
if (message!=null) {
message.getConversation().markRead();
}
notify = false;
}
@ -195,13 +197,6 @@ public class XmppConnectionService extends Service {
if (convChangedListener != null) {
convChangedListener.onConversationListChanged();
}
if (account.getKeys().has("pgp_signature")) {
try {
sendPgpPresence(account, account.getKeys().getString("pgp_signature"));
} catch (JSONException e) {
//
}
}
scheduleWakeupCall(PING_INTERVAL, true);
} else if (account.getStatus() == Account.STATUS_OFFLINE) {
Log.d(LOGTAG,"onStatusChanged offline");
@ -445,7 +440,7 @@ public class XmppConnectionService extends Service {
super.onDestroy();
for (Account account : accounts) {
if (account.getXmppConnection() != null) {
disconnect(account);
disconnect(account,true);
}
}
}
@ -864,7 +859,7 @@ public class XmppConnectionService extends Service {
public void deleteAccount(Account account) {
Log.d(LOGTAG, "called delete account");
if (account.getXmppConnection() != null) {
this.disconnect(account);
this.disconnect(account,false);
}
databaseBackend.deleteAccount(account);
this.accounts.remove(account);
@ -954,7 +949,7 @@ public class XmppConnectionService extends Service {
packet.setAttribute("to", conversation.getContactJid().split("/")[0]+"/"+nick);
packet.setAttribute("from", conversation.getAccount().getFullJid());
packet = conversation.getAccount().getXmppConnection().sendPresencePacket(packet, new OnPresencePacketReceived() {
conversation.getAccount().getXmppConnection().sendPresencePacket(packet, new OnPresencePacketReceived() {
@Override
public void onPresencePacketReceived(Account account, PresencePacket packet) {
@ -992,7 +987,7 @@ public class XmppConnectionService extends Service {
conversation.getMucOptions().setOffline();
}
public void disconnect(Account account) {
public void disconnect(final Account account, boolean blocking) {
List<Conversation> conversations = getConversations();
for (int i = 0; i < conversations.size(); i++) {
Conversation conversation = conversations.get(i);
@ -1009,9 +1004,21 @@ public class XmppConnectionService extends Service {
}
}
}
account.getXmppConnection().disconnect();
Log.d(LOGTAG, "disconnected account: " + account.getJid());
account.setXmppConnection(null);
if (!blocking) {
new Thread(new Runnable() {
@Override
public void run() {
account.getXmppConnection().disconnect(false);
Log.d(LOGTAG, "disconnected account: " + account.getJid());
account.setXmppConnection(null);
}
}).start();
} else {
account.getXmppConnection().disconnect(false);
Log.d(LOGTAG, "disconnected account: " + account.getJid());
account.setXmppConnection(null);
}
}
@Override
@ -1134,20 +1141,27 @@ public class XmppConnectionService extends Service {
this.tlsException = null;
}
public void reconnectAccount(Account account) {
if (account.getXmppConnection() != null) {
disconnect(account);
}
if (!account.isOptionSet(Account.OPTION_DISABLED)) {
if (account.getXmppConnection()==null) {
account.setXmppConnection(this.createConnection(account));
public void reconnectAccount(final Account account) {
new Thread(new Runnable() {
@Override
public void run() {
if (account.getXmppConnection() != null) {
disconnect(account,true);
}
if (!account.isOptionSet(Account.OPTION_DISABLED)) {
if (account.getXmppConnection()==null) {
account.setXmppConnection(createConnection(account));
}
Thread thread = new Thread(account.getXmppConnection());
thread.start();
}
}
Thread thread = new Thread(account.getXmppConnection());
thread.start();
}
}).start();
}
public void ping(final Account account,final int timeout) {
account.getXmppConnection().r();
Log.d(LOGTAG,account.getJid()+": sending ping");
IqPacket iq = new IqPacket(IqPacket.TYPE_GET);
Element ping = new Element("ping");

View file

@ -332,6 +332,9 @@ public class ConversationFragment extends Fragment {
final ConversationActivity activity = (ConversationActivity) getActivity();
activity.registerListener();
this.conversation = activity.getSelectedConversation();
if (this.conversation==null) {
return;
}
this.selfBitmap = findSelfPicture();
updateMessages();
// rendering complete. now go tell activity to close pane
@ -374,7 +377,6 @@ public class ConversationFragment extends Fragment {
public void updateMessages() {
ConversationActivity activity = (ConversationActivity) getActivity();
List<Message> encryptedMessages = new LinkedList<Message>();
// TODO this.conversation could be null?!
for(Message message : this.conversation.getMessages()) {
if (message.getEncryption() == Message.ENCRYPTION_PGP) {
encryptedMessages.add(message);

View file

@ -11,7 +11,7 @@ import eu.siacs.conversations.entities.Conversation;
import eu.siacs.conversations.entities.Message;
import eu.siacs.conversations.services.XmppConnectionService;
import eu.siacs.conversations.xml.Element;
import eu.siacs.conversations.xmpp.MessagePacket;
import eu.siacs.conversations.xmpp.stanzas.MessagePacket;
public class MessageParser {

View file

@ -5,25 +5,29 @@ import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.concurrent.LinkedBlockingQueue;
import android.util.Log;
import eu.siacs.conversations.xmpp.stanzas.AbstractStanza;
public class TagWriter {
private OutputStreamWriter outputStream;
private LinkedBlockingQueue<String> writeQueue = new LinkedBlockingQueue<String>();
private Thread writer = new Thread() {
public boolean shouldStop = false;
private boolean finshed = false;
private LinkedBlockingQueue<AbstractStanza> writeQueue = new LinkedBlockingQueue<AbstractStanza>();
private Thread asyncStanzaWriter = new Thread() {
private boolean shouldStop = false;
@Override
public void run() {
while(!shouldStop) {
if ((finshed)&&(writeQueue.size() == 0)) {
return;
}
try {
String output = writeQueue.take();
outputStream.write(output);
AbstractStanza output = writeQueue.take();
outputStream.write(output.toString());
outputStream.flush();
} catch (IOException e) {
Log.d("xmppService", "error writing to stream");
shouldStop = true;
} catch (InterruptedException e) {
shouldStop = true;
}
}
}
@ -31,34 +35,49 @@ public class TagWriter {
public TagWriter() {
}
public TagWriter(OutputStream out) {
this.setOutputStream(out);
writer.start();
}
public void setOutputStream(OutputStream out) {
this.outputStream = new OutputStreamWriter(out);
if (!writer.isAlive()) writer.start();
}
public TagWriter beginDocument() {
writeQueue.add("<?xml version='1.0'?>");
public TagWriter beginDocument() throws IOException {
outputStream.write("<?xml version='1.0'?>");
outputStream.flush();
return this;
}
public TagWriter writeTag(Tag tag) {
writeQueue.add(tag.toString());
public TagWriter writeTag(Tag tag) throws IOException {
outputStream.write(tag.toString());
outputStream.flush();
return this;
}
public void writeString(String string) {
writeQueue.add(string);
public TagWriter writeElement(Element element) throws IOException {
outputStream.write(element.toString());
outputStream.flush();
return this;
}
public void writeElement(Element element) {
writeQueue.add(element.toString());
public TagWriter writeStanzaAsync(AbstractStanza stanza) {
if (finshed) {
return this;
} else {
if (!asyncStanzaWriter.isAlive()) asyncStanzaWriter.start();
writeQueue.add(stanza);
return this;
}
}
public void finish() {
this.finshed = true;
}
public boolean finished() {
return (this.writeQueue.size() == 0);
}
}

View file

@ -1,6 +1,7 @@
package eu.siacs.conversations.xmpp;
import eu.siacs.conversations.entities.Account;
import eu.siacs.conversations.xmpp.stanzas.IqPacket;
public interface OnIqPacketReceived extends PacketReceived {
public void onIqPacketReceived(Account account, IqPacket packet);

View file

@ -1,6 +1,7 @@
package eu.siacs.conversations.xmpp;
import eu.siacs.conversations.entities.Account;
import eu.siacs.conversations.xmpp.stanzas.MessagePacket;
public interface OnMessagePacketReceived extends PacketReceived {
public void onMessagePacketReceived(Account account, MessagePacket packet);

View file

@ -1,6 +1,7 @@
package eu.siacs.conversations.xmpp;
import eu.siacs.conversations.entities.Account;
import eu.siacs.conversations.xmpp.stanzas.PresencePacket;
public interface OnPresencePacketReceived extends PacketReceived {
public void onPresencePacketReceived(Account account, PresencePacket packet);

View file

@ -1,13 +0,0 @@
package eu.siacs.conversations.xmpp;
import eu.siacs.conversations.xml.Element;
public class PresencePacket extends Element {
private PresencePacket(String name) {
super("presence");
}
public PresencePacket() {
super("presence");
}
}

View file

@ -26,8 +26,10 @@ import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.net.ssl.X509TrustManager;
import org.json.JSONException;
import org.xmlpull.v1.XmlPullParserException;
import android.content.IntentSender.SendIntentException;
import android.os.Bundle;
import android.os.PowerManager;
import android.util.Log;
@ -38,6 +40,14 @@ import eu.siacs.conversations.xml.Element;
import eu.siacs.conversations.xml.Tag;
import eu.siacs.conversations.xml.TagWriter;
import eu.siacs.conversations.xml.XmlReader;
import eu.siacs.conversations.xmpp.stanzas.AbstractStanza;
import eu.siacs.conversations.xmpp.stanzas.IqPacket;
import eu.siacs.conversations.xmpp.stanzas.MessagePacket;
import eu.siacs.conversations.xmpp.stanzas.PresencePacket;
import eu.siacs.conversations.xmpp.stanzas.streammgmt.AckPacket;
import eu.siacs.conversations.xmpp.stanzas.streammgmt.EnablePacket;
import eu.siacs.conversations.xmpp.stanzas.streammgmt.RequestPacket;
import eu.siacs.conversations.xmpp.stanzas.streammgmt.ResumePacket;
public class XmppConnection implements Runnable {
@ -57,6 +67,11 @@ public class XmppConnection implements Runnable {
private Element streamFeatures;
private HashSet<String> discoFeatures = new HashSet<String>();
private String streamId = null;
private int stanzasReceived = 0;
private int stanzasSent = 0;
private static final int PACKET_IQ = 0;
private static final int PACKET_MESSAGE = 1;
private static final int PACKET_PRESENCE = 2;
@ -176,6 +191,34 @@ public class XmppConnection implements Runnable {
} else if (nextTag.isStart("failure")) {
Element failure = tagReader.readElement(nextTag);
changeStatus(Account.STATUS_UNAUTHORIZED);
} else if (nextTag.isStart("enabled")) {
this.stanzasSent = 0;
Element enabled = tagReader.readElement(nextTag);
if ("true".equals(enabled.getAttribute("resume"))) {
this.streamId = enabled.getAttribute("id");
Log.d(LOGTAG,account.getJid()+": stream managment enabled (resumable)");
} else {
Log.d(LOGTAG,account.getJid()+": stream managment enabled");
}
this.stanzasReceived = 0;
RequestPacket r = new RequestPacket();
tagWriter.writeStanzaAsync(r);
} else if (nextTag.isStart("resumed")) {
tagReader.readElement(nextTag);
changeStatus(Account.STATUS_ONLINE);
Log.d(LOGTAG,account.getJid()+": session resumed");
} else if (nextTag.isStart("r")) {
tagReader.readElement(nextTag);
AckPacket ack = new AckPacket(this.stanzasReceived);
//Log.d(LOGTAG,ack.toString());
tagWriter.writeStanzaAsync(ack);
} else if (nextTag.isStart("a")) {
Element ack = tagReader.readElement(nextTag);
int serverSequence = Integer.parseInt(ack.getAttribute("h"));
if (serverSequence>this.stanzasSent) {
this.stanzasSent = serverSequence;
}
//Log.d(LOGTAG,"server ack"+ack.toString()+" ("+this.stanzasSent+")");
} else if (nextTag.isStart("iq")) {
processIq(nextTag);
} else if (nextTag.isStart("message")) {
@ -221,6 +264,7 @@ public class XmppConnection implements Runnable {
}
nextTag = tagReader.readTag();
}
++stanzasReceived;
return element;
}
@ -271,7 +315,7 @@ public class XmppConnection implements Runnable {
}
}
private void sendStartTLS() {
private void sendStartTLS() throws IOException {
Tag startTLS = Tag.empty("starttls");
startTLS.setAttribute("xmlns", "urn:ietf:params:xml:ns:xmpp-tls");
tagWriter.writeTag(startTLS);
@ -378,25 +422,45 @@ public class XmppConnection implements Runnable {
} else if (this.streamFeatures.hasChild("mechanisms")
&& shouldAuthenticate) {
sendSaslAuth();
}
if (this.streamFeatures.hasChild("bind") && shouldBind) {
} else if (this.streamFeatures.hasChild("sm") && streamId != null) {
Log.d(LOGTAG,"found old stream id. trying to remuse");
ResumePacket resume = new ResumePacket(this.streamId,stanzasReceived);
this.tagWriter.writeStanzaAsync(resume);
} else if (this.streamFeatures.hasChild("bind") && shouldBind) {
sendBindRequest();
if (this.streamFeatures.hasChild("session")) {
Log.d(LOGTAG,"sending session");
IqPacket startSession = new IqPacket(IqPacket.TYPE_SET);
Element session = new Element("session");
session.setAttribute("xmlns",
"urn:ietf:params:xml:ns:xmpp-session");
session.setContent("");
startSession.addChild(session);
sendIqPacket(startSession, null);
tagWriter.writeElement(startSession);
this.sendIqPacket(startSession, null);
}
Element presence = new Element("presence");
tagWriter.writeElement(presence);
}
}
private void sendInitialPresence() {
PresencePacket packet = new PresencePacket();
packet.setAttribute("from", account.getFullJid());
if (account.getKeys().has("pgp_signature")) {
try {
String signature = account.getKeys().getString("pgp_signature");
Element status = new Element("status");
status.setContent("online");
packet.addChild(status);
Element x = new Element("x");
x.setAttribute("xmlns", "jabber:x:signed");
x.setContent(signature);
packet.addChild(x);
} catch (JSONException e) {
//
}
}
this.sendPresencePacket(packet);
}
private void sendBindRequest() throws IOException {
IqPacket iq = new IqPacket(IqPacket.TYPE_SET);
Element bind = new Element("bind");
@ -412,10 +476,15 @@ public class XmppConnection implements Runnable {
.getContent().split("/")[1];
account.setResource(resource);
account.setStatus(Account.STATUS_ONLINE);
if (streamFeatures.hasChild("sm")) {
EnablePacket enable = new EnablePacket();
tagWriter.writeStanzaAsync(enable);
}
sendInitialPresence();
sendServiceDiscovery();
if (statusListener != null) {
statusListener.onStatusChanged(account);
}
sendServiceDiscovery();
}
});
}
@ -471,7 +540,7 @@ public class XmppConnection implements Runnable {
Log.d(LOGTAG, "processStreamError");
}
private void sendStartStream() {
private void sendStartStream() throws IOException {
Tag stream = Tag.start("stream:stream");
stream.setAttribute("from", account.getJid());
stream.setAttribute("to", account.getServer());
@ -489,39 +558,36 @@ public class XmppConnection implements Runnable {
public void sendIqPacket(IqPacket packet, OnIqPacketReceived callback) {
String id = nextRandomId();
packet.setAttribute("id", id);
tagWriter.writeElement(packet);
if (callback != null) {
packetCallbacks.put(id, callback);
}
this.sendPacket(packet, callback);
}
public void sendMessagePacket(MessagePacket packet) {
this.sendMessagePacket(packet, null);
this.sendPacket(packet, null);
}
public void sendMessagePacket(MessagePacket packet,
OnMessagePacketReceived callback) {
String id = nextRandomId();
packet.setAttribute("id", id);
tagWriter.writeElement(packet);
if (callback != null) {
packetCallbacks.put(id, callback);
}
this.sendPacket(packet, callback);
}
public void sendPresencePacket(PresencePacket packet) {
this.sendPresencePacket(packet, null);
this.sendPacket(packet, null);
}
public PresencePacket sendPresencePacket(PresencePacket packet,
public void sendPresencePacket(PresencePacket packet,
OnPresencePacketReceived callback) {
String id = nextRandomId();
packet.setAttribute("id", id);
tagWriter.writeElement(packet);
this.sendPacket(packet, callback);
}
private synchronized void sendPacket(final AbstractStanza packet, PacketReceived callback) {
++stanzasSent;
tagWriter.writeStanzaAsync(packet);
if (callback != null) {
packetCallbacks.put(id, callback);
if (packet.getId()==null) {
packet.setId(nextRandomId());
}
packetCallbacks.put(packet.getId(), callback);
}
return packet;
}
public void setOnMessagePacketReceivedListener(
@ -547,8 +613,23 @@ public class XmppConnection implements Runnable {
this.tlsListener = listener;
}
public void disconnect() {
public void disconnect(boolean force) {
Log.d(LOGTAG,"disconnecting");
try {
if (force) {
socket.close();
}
tagWriter.finish();
while(!tagWriter.finished()) {
Log.d(LOGTAG,"not yet finished");
Thread.sleep(100);
}
tagWriter.writeTag(Tag.end("stream:stream"));
} catch (IOException e) {
Log.d(LOGTAG,"io exception during disconnect");
} catch (InterruptedException e) {
Log.d(LOGTAG,"interupted while waiting for disconnect");
}
}
public boolean hasFeatureRosterManagment() {
@ -558,4 +639,8 @@ public class XmppConnection implements Runnable {
return this.streamFeatures.hasChild("ver");
}
}
public void r() {
this.tagWriter.writeStanzaAsync(new RequestPacket());
}
}

View file

@ -0,0 +1,34 @@
package eu.siacs.conversations.xmpp.stanzas;
import eu.siacs.conversations.xml.Element;
public class AbstractStanza extends Element {
protected AbstractStanza(String name) {
super(name);
}
public String getTo() {
return getAttribute("to");
}
public String getFrom() {
return getAttribute("from");
}
public String getId() {
return this.getAttribute("id");
}
public void setTo(String to) {
setAttribute("to", to);
}
public void setFrom(String from) {
setAttribute("from",from);
}
public void setId(String id) {
setAttribute("id",id);
}
}

View file

@ -1,8 +1,7 @@
package eu.siacs.conversations.xmpp;
package eu.siacs.conversations.xmpp.stanzas;
import eu.siacs.conversations.xml.Element;
public class IqPacket extends Element {
public class IqPacket extends AbstractStanza {
public static final int TYPE_SET = 0;
public static final int TYPE_RESULT = 1;
@ -33,8 +32,4 @@ public class IqPacket extends Element {
super("iq");
}
public String getId() {
return this.getAttribute("id");
}
}

View file

@ -1,30 +1,18 @@
package eu.siacs.conversations.xmpp;
package eu.siacs.conversations.xmpp.stanzas;
import eu.siacs.conversations.xml.Element;
public class MessagePacket extends Element {
public class MessagePacket extends AbstractStanza {
public static final int TYPE_CHAT = 0;
public static final int TYPE_UNKNOWN = 1;
public static final int TYPE_NO = 2;
public static final int TYPE_GROUPCHAT = 3;
public static final int TYPE_ERROR = 4;
private MessagePacket(String name) {
super(name);
}
public MessagePacket() {
super("message");
}
public String getTo() {
return getAttribute("to");
}
public String getFrom() {
return getAttribute("from");
}
public String getBody() {
Element body = this.findChild("body");
if (body!=null) {
@ -34,14 +22,6 @@ public class MessagePacket extends Element {
}
}
public void setTo(String to) {
setAttribute("to", to);
}
public void setFrom(String from) {
setAttribute("from",from);
}
public void setBody(String text) {
this.children.remove(findChild("body"));
Element body = new Element("body");

View file

@ -0,0 +1,9 @@
package eu.siacs.conversations.xmpp.stanzas;
public class PresencePacket extends AbstractStanza {
public PresencePacket() {
super("presence");
}
}

View file

@ -0,0 +1,13 @@
package eu.siacs.conversations.xmpp.stanzas.streammgmt;
import eu.siacs.conversations.xmpp.stanzas.AbstractStanza;
public class AckPacket extends AbstractStanza {
public AckPacket(int sequence) {
super("a");
this.setAttribute("xmlns","urn:xmpp:sm:3");
this.setAttribute("h", ""+sequence);
}
}

View file

@ -0,0 +1,13 @@
package eu.siacs.conversations.xmpp.stanzas.streammgmt;
import eu.siacs.conversations.xmpp.stanzas.AbstractStanza;
public class EnablePacket extends AbstractStanza {
public EnablePacket() {
super("enable");
this.setAttribute("xmlns","urn:xmpp:sm:3");
this.setAttribute("resume", "true");
}
}

View file

@ -0,0 +1,12 @@
package eu.siacs.conversations.xmpp.stanzas.streammgmt;
import eu.siacs.conversations.xmpp.stanzas.AbstractStanza;
public class RequestPacket extends AbstractStanza {
public RequestPacket() {
super("r");
this.setAttribute("xmlns","urn:xmpp:sm:3");
}
}

View file

@ -0,0 +1,14 @@
package eu.siacs.conversations.xmpp.stanzas.streammgmt;
import eu.siacs.conversations.xmpp.stanzas.AbstractStanza;
public class ResumePacket extends AbstractStanza {
public ResumePacket(String id, int sequence) {
super("resume");
this.setAttribute("xmlns","urn:xmpp:sm:3");
this.setAttribute("previd", id);
this.setAttribute("h", ""+sequence);
}
}