Projects tigase _server server-core Commits 3a623f74
ctrl k
  • src/main/java/tigase/cluster/AmpComponentClustered.java
    ■ ■ ■ ■ ■
    skipped 38 lines
    39 39  import tigase.cluster.api.CommandListenerAbstract;
    40 40  import tigase.server.Message;
    41 41  import tigase.server.Packet;
     42 +import tigase.server.Priority;
    42 43  import tigase.server.amp.AmpComponent;
    43 44  import static tigase.server.amp.AmpFeatureIfc.FROM_CONN_ID;
    44 45  import tigase.util.TigaseStringprepException;
    skipped 58 lines
    103 104   protected class PacketForwardCommand extends CommandListenerAbstract {
    104 105   
    105 106   public PacketForwardCommand(String name) {
    106  - super(name);
     107 + super(name, Priority.HIGH);
    107 108   }
    108 109   
    109 110   @Override
    skipped 17 lines
  • src/main/java/tigase/cluster/ClusterConnection.java
    ■ ■ ■ ■ ■ ■
     1 +/*
     2 + * ClusterConnection.java
     3 + *
     4 + * Tigase Jabber/XMPP Server
     5 + * Copyright (C) 2004-2015 "Tigase, Inc." <office@tigase.com>
     6 + *
     7 + * This program is free software: you can redistribute it and/or modify
     8 + * it under the terms of the GNU Affero General Public License as published by
     9 + * the Free Software Foundation, either version 3 of the License,
     10 + * or (at your option) any later version.
     11 + *
     12 + * This program is distributed in the hope that it will be useful,
     13 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
     14 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     15 + * GNU Affero General Public License for more details.
     16 + *
     17 + * You should have received a copy of the GNU Affero General Public License
     18 + * along with this program. Look for COPYING file in the top folder.
     19 + * If not, see http://www.gnu.org/licenses/.
     20 + *
     21 + */
     22 +package tigase.cluster;
     23 + 
     24 +import java.util.List;
     25 +import java.util.concurrent.CopyOnWriteArrayList;
     26 +import tigase.xmpp.XMPPIOService;
     27 + 
     28 +/**
     29 + *
     30 + * @author andrzej
     31 + */
     32 +public class ClusterConnection {
     33 +
     34 + private final String addr;
     35 + private final CopyOnWriteArrayList<XMPPIOService<Object>> conns = new CopyOnWriteArrayList<>();
     36 +
     37 + public ClusterConnection(String addr) {
     38 + this.addr = addr;
     39 + }
     40 +
     41 + public void addConn(XMPPIOService<Object> conn) {
     42 + conns.add(conn);
     43 + }
     44 +
     45 + public void removeConn(XMPPIOService<Object> conn) {
     46 + conns.remove(conn);
     47 + }
     48 +
     49 + public int size() {
     50 + return conns.size();
     51 + }
     52 +
     53 + public List<XMPPIOService<Object>> getConnections() {
     54 + return conns;
     55 + }
     56 + 
     57 + @Override
     58 + public String toString() {
     59 + return addr + conns;
     60 + }
     61 +
     62 +}
     63 + 
  • src/main/java/tigase/cluster/ClusterConnectionManager.java
    ■ ■ ■ ■ ■ ■
    skipped 25 lines
    26 26   
    27 27  //~--- non-JDK imports --------------------------------------------------------
    28 28   
     29 +import tigase.cluster.api.ClusterConnectionHandler;
    29 30  import java.net.InetAddress;
    30 31  import java.net.NetworkInterface;
    31 32  import java.net.SocketException;
    skipped 16 lines
    48 49  import javax.script.Bindings;
    49 50   
    50 51  import tigase.cluster.api.ClusterCommandException;
     52 +import tigase.cluster.api.ClusterConnectionSelectorIfc;
    51 53  import tigase.cluster.api.ClusterControllerIfc;
    52 54  import tigase.cluster.api.ClusterElement;
    53 55  import tigase.cluster.api.ClusteredComponentIfc;
    skipped 42 lines
    96 98   */
    97 99  public class ClusterConnectionManager
    98 100   extends ConnectionManager<XMPPIOService<Object>>
    99  - implements ClusteredComponentIfc, RepositoryChangeListenerIfc<ClusterRepoItem> {
     101 + implements ClusteredComponentIfc, RepositoryChangeListenerIfc<ClusterRepoItem>, ClusterConnectionHandler {
    100 102   /** Field description */
    101 103   public static final String CLCON_REPO_CLASS_PROP_KEY = "repository-class";
    102 104   
    skipped 13 lines
    116 118   "cluster-connections-per-node";
    117 119   
    118 120   /** Field description */
    119  - public static final int CLUSTER_CONNECTIONS_PER_NODE_VAL = 2;
     121 + public static final int CLUSTER_CONNECTIONS_PER_NODE_VAL = 3;
     122 + 
     123 + /** Field description */
     124 + public static final String CLUSTER_CONNECTIONS_SELECTOR_KEY = "connection-selector";
     125 + /** Field description */
     126 + public static final String DEF_CLUSTER_CONNECTIONS_SELECTOR_VAL = ClusterConnectionSelector.class.getCanonicalName();
    120 127  
    121 128   /** Field description */
    122 129   public static final String CLUSTER_CONTR_ID_PROP_KEY = "cluster-controller-id";
    skipped 61 lines
    184 191   new IOServiceStatisticsGetter();
    185 192   private String identity_type =
    186 193   IDENTITY_TYPE_VAL;
    187  - private Map<String, CopyOnWriteArrayList<XMPPIOService<Object>>> connectionsPool =
     194 + private Map<String, ClusterConnection> connectionsPool =
    188 195   new ConcurrentSkipListMap<>();
    189 196   private boolean connect_all = CONNECT_ALL_PROP_VAL;
    190 197   private boolean compress_stream = COMPRESS_STREAM_PROP_VAL;
    skipped 10 lines
    201 208   
    202 209   // private long packetsSent = 0;
    203 210   // private long packetsReceived = 0;
     211 + private ClusterConnectionSelectorIfc connectionSelector = null;
    204 212   private CommandListener sendPacket = new SendPacket(ClusterControllerIfc
    205 213   .DELIVER_CLUSTER_PACKET_CMD);
    206 214   private boolean nonClusterTrafficAllowed = true;
    skipped 293 lines
    500 508   Map<String, Object> sessionData = service.getSessionData();
    501 509   String[] routings = (String[]) sessionData.get( PORT_ROUTING_TABLE_PROP_KEY );
    502 510   String addr = (String) sessionData.get( PORT_REMOTE_HOST_PROP_KEY );
    503  - CopyOnWriteArrayList<XMPPIOService<Object>> conns = connectionsPool.get( addr );
     511 + ClusterConnection conns = connectionsPool.get( addr );
    504 512   
    505 513   if (conns == null) {
    506  - conns = new CopyOnWriteArrayList<>();
     514 + conns = new ClusterConnection(addr);
    507 515   connectionsPool.put(addr, conns);
    508 516   }
    509 517   
    510 518   int size = conns.size();
    511 519   
    512  - conns.remove( service );
     520 + conns.removeConn( service );
    513 521   if ( log.isLoggable( Level.FINEST ) ){
    514 522   log.log( Level.FINEST, "serviceStopped: result={0} / size={1} / connPool={2} / serv={3} / conns={4} / type={5}",
    515 523   new Object[] { result, size, connectionsPool, service, conns, service.connectionType() } );
    skipped 191 lines
    707 715   props.put(WATCHDOG_DELAY, 30 * SECOND);
    708 716   props.put(WATCHDOG_TIMEOUT, -1 * SECOND);
    709 717  
     718 + props.put(CLUSTER_CONNECTIONS_SELECTOR_KEY, DEF_CLUSTER_CONNECTIONS_SELECTOR_VAL);
     719 +
    710 720   if (getDefHostName().toString().equalsIgnoreCase( "localhost") ) {
    711 721   TigaseRuntime.getTigaseRuntime().shutdownTigase( new String [] {
    712 722   "",
    skipped 79 lines
    792 802   if (props.get(CLUSTER_CONNECTIONS_PER_NODE_PROP_KEY) != null) {
    793 803   per_node_conns = (Integer) props.get(CLUSTER_CONNECTIONS_PER_NODE_PROP_KEY);
    794 804   }
     805 + 
     806 + if (props.containsKey(CLUSTER_CONNECTIONS_SELECTOR_KEY)) {
     807 + String selectorClsName = (String) props.get(CLUSTER_CONNECTIONS_SELECTOR_KEY);
     808 + try {
     809 + ClusterConnectionSelectorIfc tmp_selector = (ClusterConnectionSelectorIfc) ModulesManagerImpl.getInstance().forName(selectorClsName).newInstance();
     810 + tmp_selector.setClusterConnectionHandler(this);
     811 + tmp_selector.setProperties(props);
     812 + connectionSelector = tmp_selector;
     813 + } catch (InstantiationException|ClassNotFoundException|IllegalAccessException ex) {
     814 + log.log(Level.SEVERE, "Coulnd not create instance of cluster connection selector of class " + selectorClsName, ex);
     815 + }
     816 + }
     817 +
    795 818   connectionDelay = 5 * SECOND;
    796 819   if ((props.size() == 1) || isInitializationComplete()) {
    797 820   super.setProperties(props);
    skipped 46 lines
    844 867   String[] routings = (String[]) serv.getSessionData().get(PORT_ROUTING_TABLE_PROP_KEY);
    845 868   String addr = (String) serv.getSessionData().get(PORT_REMOTE_HOST_PROP_KEY);
    846 869   
    847  - CopyOnWriteArrayList<XMPPIOService<Object>> conns = connectionsPool.get(addr);
     870 + ClusterConnection conns = connectionsPool.get(addr);
    848 871   
    849 872   if (conns == null) {
    850  - conns = new CopyOnWriteArrayList<XMPPIOService<Object>>();
     873 + conns = new ClusterConnection(addr);
    851 874   connectionsPool.put(addr, conns);
    852 875   }
    853 876   
    skipped 7 lines
    861 884   // setting userJid to hostname of remote cluster node
    862 885   serv.setUserJid((String) serv.getSessionData().get(PORT_REMOTE_HOST_PROP_KEY));
    863 886  
    864  - conns.add( serv );
     887 + conns.addConn(serv );
    865 888   if ( size == 0 && conns.size() > 0 ){
    866 889   updateRoutings(routings, true);
    867 890   log.log(Level.INFO, "Connected to: {0}", addr);
    skipped 16 lines
    884 907   
    885 908   // ++packetsSent;
    886 909   String ip = p.getTo().getDomain();
     910 + ClusterConnection conns = connectionsPool.get(ip);
    887 911   
    888  - int code = Math.abs(hashCodeForPacket(p));
    889  - CopyOnWriteArrayList<XMPPIOService<Object>> conns = connectionsPool.get(ip);
    890  - 
    891  - if ((conns != null) && (conns.size() > 0)) {
    892  - XMPPIOService<Object> serv = conns.get(code % conns.size());
    893  - 
     912 + XMPPIOService<Object> serv = connectionSelector.selectConnection(p, conns);
     913 + if (serv != null) {
    894 914   return super.writePacketToSocket(serv, p);
    895 915   } else {
    896 916   log.log(Level.WARNING, "No cluster connection to send a packet: {0}", p);
    skipped 249 lines
    1146 1166   private class SendPacket
    1147 1167   extends CommandListenerAbstract {
    1148 1168   private SendPacket(String name) {
    1149  - super(name);
     1169 + super(name, null);
    1150 1170   }
    1151 1171   
    1152 1172   //~--- methods ------------------------------------------------------------
    skipped 61 lines
  • src/main/java/tigase/cluster/ClusterConnectionSelector.java
    ■ ■ ■ ■ ■ ■
     1 +/*
     2 + * ClusterConnectionSelector.java
     3 + *
     4 + * Tigase Jabber/XMPP Server
     5 + * Copyright (C) 2004-2015 "Tigase, Inc." <office@tigase.com>
     6 + *
     7 + * This program is free software: you can redistribute it and/or modify
     8 + * it under the terms of the GNU Affero General Public License as published by
     9 + * the Free Software Foundation, either version 3 of the License,
     10 + * or (at your option) any later version.
     11 + *
     12 + * This program is distributed in the hope that it will be useful,
     13 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
     14 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     15 + * GNU Affero General Public License for more details.
     16 + *
     17 + * You should have received a copy of the GNU Affero General Public License
     18 + * along with this program. Look for COPYING file in the top folder.
     19 + * If not, see http://www.gnu.org/licenses/.
     20 + *
     21 + */
     22 +package tigase.cluster;
     23 + 
     24 +import java.util.List;
     25 +import java.util.Map;
     26 +import tigase.cluster.api.ClusterConnectionHandler;
     27 +import tigase.cluster.api.ClusterConnectionSelectorIfc;
     28 +import tigase.server.Packet;
     29 +import tigase.server.Priority;
     30 +import tigase.xmpp.XMPPIOService;
     31 + 
     32 +/**
     33 + * Advanced implementation of ClusterConnectionSelectorIfc which separates packets
     34 + * with priority CLUSTER or higher from other packets in cluster connections
     35 + * by using separate connections for them
     36 + *
     37 + * @author andrzej
     38 + */
     39 +public class ClusterConnectionSelector implements ClusterConnectionSelectorIfc {
     40 + 
     41 + protected static final String CLUSTER_SYS_CONNECTIONS_PER_NODE_PROP_KEY = "cluster-sys-connections-per-node";
     42 +
     43 + private int allConns = ClusterConnectionManager.CLUSTER_CONNECTIONS_PER_NODE_VAL;
     44 + private int sysConns = 1;
     45 + private ClusterConnectionHandler handler;
     46 +
     47 + @Override
     48 + public XMPPIOService<Object> selectConnection(Packet p, ClusterConnection conn) {
     49 + if (conn == null)
     50 + return null;
     51 +
     52 + int code = Math.abs(handler.hashCodeForPacket(p));
     53 + List<XMPPIOService<Object>> conns = conn.getConnections();
     54 + if (conns.size() > 0) {
     55 + if (conns.size() > sysConns) {
     56 + if (p.getPriority() != null && p.getPriority().ordinal() <= Priority.CLUSTER.ordinal()) {
     57 + return conns.get(code % sysConns);
     58 + } else {
     59 + return conns.get(sysConns + (code % (conns.size() - sysConns)));
     60 + }
     61 + } else {
     62 + return conns.get(code % conns.size());
     63 + }
     64 + }
     65 + return null;
     66 + }
     67 +
     68 + @Override
     69 + public void setClusterConnectionHandler(ClusterConnectionHandler handler) {
     70 + this.handler = handler;
     71 + }
     72 +
     73 + @Override
     74 + public void setProperties(Map<String,Object> props) {
     75 + if (props.containsKey(CLUSTER_SYS_CONNECTIONS_PER_NODE_PROP_KEY)) {
     76 + sysConns = (Integer) props.get(CLUSTER_SYS_CONNECTIONS_PER_NODE_PROP_KEY);
     77 + }
     78 + if (props.containsKey(ClusterConnectionManager.CLUSTER_CONNECTIONS_PER_NODE_PROP_KEY)) {
     79 + allConns = (Integer) props.get(ClusterConnectionManager.CLUSTER_CONNECTIONS_PER_NODE_PROP_KEY);
     80 + }
     81 + }
     82 +}
     83 + 
  • src/main/java/tigase/cluster/ClusterConnectionSelectorOld.java
    ■ ■ ■ ■ ■ ■
     1 +/*
     2 + * ClusterConnectionSelectorOld.java
     3 + *
     4 + * Tigase Jabber/XMPP Server
     5 + * Copyright (C) 2004-2015 "Tigase, Inc." <office@tigase.com>
     6 + *
     7 + * This program is free software: you can redistribute it and/or modify
     8 + * it under the terms of the GNU Affero General Public License as published by
     9 + * the Free Software Foundation, either version 3 of the License,
     10 + * or (at your option) any later version.
     11 + *
     12 + * This program is distributed in the hope that it will be useful,
     13 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
     14 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     15 + * GNU Affero General Public License for more details.
     16 + *
     17 + * You should have received a copy of the GNU Affero General Public License
     18 + * along with this program. Look for COPYING file in the top folder.
     19 + * If not, see http://www.gnu.org/licenses/.
     20 + *
     21 + */
     22 +package tigase.cluster;
     23 + 
     24 +import java.util.List;
     25 +import java.util.Map;
     26 +import tigase.cluster.api.ClusterConnectionHandler;
     27 +import tigase.cluster.api.ClusterConnectionSelectorIfc;
     28 +import tigase.server.Packet;
     29 +import tigase.xmpp.XMPPIOService;
     30 + 
     31 +/**
     32 + * ClusterConnectionSelectorOld class implements old cluster connection selection
     33 + * algoritm which before was part of ClusterConnectionManager class.
     34 + *
     35 + * @author andrzej
     36 + */
     37 +public class ClusterConnectionSelectorOld implements ClusterConnectionSelectorIfc {
     38 + 
     39 + private ClusterConnectionHandler handler;
     40 +
     41 + @Override
     42 + public XMPPIOService<Object> selectConnection(Packet p, ClusterConnection conn) {
     43 + if (conn == null)
     44 + return null;
     45 +
     46 + int code = Math.abs(handler.hashCodeForPacket(p));
     47 + List<XMPPIOService<Object>> conns = conn.getConnections();
     48 + if (conns.size() > 0) {
     49 + return conns.get(code % conns.size());
     50 + }
     51 + return null;
     52 + }
     53 + 
     54 + @Override
     55 + public void setClusterConnectionHandler(ClusterConnectionHandler handler) {
     56 + this.handler = handler;
     57 + }
     58 +
     59 + @Override
     60 + public void setProperties(Map<String,Object> props) {
     61 +
     62 + }
     63 +
     64 +}
     65 + 
  • src/main/java/tigase/cluster/ClusterController.java
    ■ ■ ■ ■ ■ ■
    skipped 43 lines
    44 44  import tigase.conf.ConfigurationException;
    45 45  import tigase.server.AbstractComponentRegistrator;
    46 46  import tigase.server.Packet;
     47 +import tigase.server.Priority;
    47 48  import tigase.server.ServerComponent;
    48 49  import tigase.xml.Element;
    49 50  import tigase.xmpp.JID;
    skipped 110 lines
    160 161   
    161 162   Queue<Element> results = new ArrayDeque<Element>();
    162 163   
     164 + // retrive listener for command and it's priority to if available
     165 + CommandListener listener = commandListeners.get(command);
     166 + Priority priority = listener != null ? listener.getPriority() : null;
     167 +
    163 168   // TODO: Maybe more optimal would be creating the object once and then clone
    164 169   // it? However, the 'to' parameter must be double-checked whether all
    165 170   // internal states are set properly for each different to parameter
    skipped 1 lines
    167 172   ClusterElement clel = ClusterElement.createClusterMethodCall(fromNode, to,
    168 173   StanzaType.set, command, data);
    169 174   
     175 + // set priority to ClusterElement so it will get proper priority for processing
     176 + if (priority != null)
     177 + clel.setPriority(priority);
    170 178   clel.addVisitedNodes(visitedNodes);
    171 179   clel.addDataPackets(packets);
    172 180   
    skipped 189 lines
  • src/main/java/tigase/cluster/api/ClusterConnectionHandler.java
    ■ ■ ■ ■ ■ ■
     1 +/*
     2 + * ClusterConnectionHandler.java
     3 + *
     4 + * Tigase Jabber/XMPP Server
     5 + * Copyright (C) 2004-2015 "Tigase, Inc." <office@tigase.com>
     6 + *
     7 + * This program is free software: you can redistribute it and/or modify
     8 + * it under the terms of the GNU Affero General Public License as published by
     9 + * the Free Software Foundation, either version 3 of the License,
     10 + * or (at your option) any later version.
     11 + *
     12 + * This program is distributed in the hope that it will be useful,
     13 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
     14 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     15 + * GNU Affero General Public License for more details.
     16 + *
     17 + * You should have received a copy of the GNU Affero General Public License
     18 + * along with this program. Look for COPYING file in the top folder.
     19 + * If not, see http://www.gnu.org/licenses/.
     20 + *
     21 + */
     22 +package tigase.cluster.api;
     23 + 
     24 +import tigase.server.Packet;
     25 + 
     26 +/**
     27 + * ClusterConnectionHandler interface used by ClusterConnectionSelectorIfc
     28 + * implementations to separate implementation from ClusterConnectionManager
     29 + *
     30 + * @author andrzej
     31 + */
     32 +public interface ClusterConnectionHandler {
     33 + 
     34 + /**
     35 + * Generates hashCode for particular packet used to spread processing between
     36 + * thread or connections
     37 + *
     38 + * @param packet
     39 + * @return
     40 + */
     41 + int hashCodeForPacket(Packet packet);
     42 +
     43 +}
     44 + 
  • src/main/java/tigase/cluster/api/ClusterConnectionSelectorIfc.java
    ■ ■ ■ ■ ■ ■
     1 +/*
     2 + * ClusterConnectionSelectorIfc.java
     3 + *
     4 + * Tigase Jabber/XMPP Server
     5 + * Copyright (C) 2004-2015 "Tigase, Inc." <office@tigase.com>
     6 + *
     7 + * This program is free software: you can redistribute it and/or modify
     8 + * it under the terms of the GNU Affero General Public License as published by
     9 + * the Free Software Foundation, either version 3 of the License,
     10 + * or (at your option) any later version.
     11 + *
     12 + * This program is distributed in the hope that it will be useful,
     13 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
     14 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     15 + * GNU Affero General Public License for more details.
     16 + *
     17 + * You should have received a copy of the GNU Affero General Public License
     18 + * along with this program. Look for COPYING file in the top folder.
     19 + * If not, see http://www.gnu.org/licenses/.
     20 + *
     21 + */
     22 +package tigase.cluster.api;
     23 + 
     24 +import java.util.Map;
     25 +import tigase.cluster.ClusterConnection;
     26 +import tigase.server.Packet;
     27 +import tigase.xmpp.XMPPIOService;
     28 + 
     29 +/**
     30 + * Interface ClusterConnectionSelectorIfc is base interface for classes responsible
     31 + * for selecting connection which should be used to send packet between cluster nodes
     32 + *
     33 + * @author andrzej
     34 + */
     35 +public interface ClusterConnectionSelectorIfc {
     36 +
     37 + /**
     38 + * Method returns XMPPIOService instances which should be used to
     39 + * send packet between cluster nodes
     40 + *
     41 + * @param packet
     42 + * @param conn
     43 + * @return
     44 + */
     45 + XMPPIOService<Object> selectConnection(Packet packet, ClusterConnection conn);
     46 +
     47 + void setClusterConnectionHandler(ClusterConnectionHandler handler);
     48 + 
     49 + void setProperties(Map<String,Object> props);
     50 +
     51 +}
     52 + 
  • src/main/java/tigase/cluster/api/ClusterElement.java
    ■ ■ ■ ■ ■ ■
    skipped 43 lines
    44 44  import java.util.Map;
    45 45  import java.util.Queue;
    46 46  import java.util.Set;
     47 +import tigase.server.Priority;
    47 48   
    48 49  /**
    49 50   * Class ClusterElement is a utility class for handling tigase cluster specific
    skipped 110 lines
    160 161   private Map<String, String> method_params = null;
    161 162   private Map<String, String> method_results = null;
    162 163   private Queue<Element> packets = null;
     164 + private Priority priority = null;
    163 165   private Set<JID> visited_nodes = null;
    164 166   
    165 167   //~--- constructors ---------------------------------------------------------
    skipped 42 lines
    208 210   if (log.isLoggable(Level.FINEST)) {
    209 211   log.finest("No visited nodes found");
    210 212   }
     213 + }
     214 + 
     215 + String priorityStr = elem.getAttributeStaticStr(Packet.PRIORITY_ATT);
     216 + if (priorityStr != null) {
     217 + priority = Priority.valueOf(priorityStr);
    211 218   }
    212 219  
    213 220   Element method_call = elem.findChildStaticStr(CLUSTER_METHOD_PATH);
    skipped 432 lines
    646 653   }
    647 654   }
    648 655  
     656 + public Priority getPriority() {
     657 + return priority;
     658 + }
     659 + 
    649 660   /**
    650 661   * Method description
    651 662   *
    skipped 14 lines
    666 677   */
    667 678   public boolean isVisitedNode(JID node_id) {
    668 679   return visited_nodes.contains(node_id);
     680 + }
     681 + 
     682 + public void setPriority(Priority priority) {
     683 + this.priority = priority;
     684 + this.elem.setAttribute(Packet.PRIORITY_ATT, priority.name());
    669 685   }
    670 686  
    671 687   //~--- methods --------------------------------------------------------------
    skipped 69 lines
  • src/main/java/tigase/cluster/api/CommandListener.java
    ■ ■ ■ ■ ■ ■
    skipped 36 lines
    37 37  import java.util.Map;
    38 38  import java.util.Queue;
    39 39  import java.util.Set;
     40 +import tigase.server.Priority;
    40 41   
    41 42  /**
    42 43   * @author Artur Hefczyc Created Mar 16, 2011
    skipped 24 lines
    67 68   * @return a value of <code>String</code> name of the command
    68 69   */
    69 70   String getName();
     71 + 
     72 + /**
     73 + * Method returns priority of particular command which should be used
     74 + * to assign proper priority for processing of this command
     75 + *
     76 + * @return
     77 + */
     78 + Priority getPriority();
    70 79  
    71 80   /**
    72 81   * Method allows retrieval possible statistics for particular command
    skipped 15 lines
  • src/main/java/tigase/cluster/api/CommandListenerAbstract.java
    ■ ■ ■ ■ ■ ■
    skipped 39 lines
    40 40  //~--- JDK imports ------------------------------------------------------------
    41 41   
    42 42  import java.util.Map;
     43 +import tigase.server.Priority;
    43 44   
    44 45  /**
    45 46   *
    skipped 7 lines
    53 54   //~--- fields ---------------------------------------------------------------
    54 55   
    55 56   private String commandName;
     57 + private Priority priority;
    56 58   
    57 59   //~--- constructors ---------------------------------------------------------
    58 60   
    skipped 3 lines
    62 64   *
    63 65   * @param name
    64 66   */
    65  - public CommandListenerAbstract(String name) {
     67 + public CommandListenerAbstract(String name, Priority priority) {
    66 68   setName(name);
     69 + setPriority(priority);
    67 70   }
    68 71   
    69 72   //~--- methods --------------------------------------------------------------
    skipped 44 lines
    114 117   }
    115 118   
    116 119   @Override
     120 + public Priority getPriority() {
     121 + return priority;
     122 + }
     123 + 
     124 + @Override
    117 125   public void getStatistics(StatisticsList list) {}
    118 126   
    119 127   /**
    skipped 24 lines
    144 152   public final void setName(String name) {
    145 153   commandName = name;
    146 154   }
     155 + 
     156 + public void setPriority(Priority priority) {
     157 + this.priority = priority;
     158 + }
     159 +
    147 160  }
    148 161   
  • src/main/java/tigase/cluster/strategy/DefaultClusteringStrategy.java
    ■ ■ ■ ■ ■
    skipped 65 lines
    66 66  import java.util.Queue;
    67 67  import java.util.Random;
    68 68  import java.util.Set;
     69 +import tigase.server.Priority;
    69 70   
    70 71  /**
    71 72   * Created: May 13, 2009 9:53:44 AM
    skipped 342 lines
    414 415   * @param name
    415 416   */
    416 417   public UserConnectedCommand(String name) {
    417  - super(name);
     418 + super(name, Priority.CLUSTER);
    418 419   }
    419 420   
    420 421   //~--- methods ------------------------------------------------------------
    skipped 61 lines
    482 483   * @param name
    483 484   */
    484 485   public UserPresenceCommand(String name) {
    485  - super(name);
     486 + super(name, Priority.CLUSTER);
    486 487   }
    487 488   
    488 489   //~--- methods ------------------------------------------------------------
    skipped 74 lines
  • src/main/java/tigase/cluster/strategy/cmd/PacketForwardCmd.java
    ■ ■ ■ ■ ■
    skipped 52 lines
    53 53  import java.util.Map;
    54 54  import java.util.Queue;
    55 55  import java.util.Set;
     56 +import tigase.server.Priority;
    56 57   
    57 58  /**
    58 59   * Class description
    skipped 24 lines
    83 84   * @param strategy
    84 85   */
    85 86   public PacketForwardCmd(String name, DefaultClusteringStrategyAbstract strategy) {
    86  - super(name);
     87 + super(name, Priority.HIGH);
    87 88   this.strategy = strategy;
    88 89   }
    89 90   
    skipped 84 lines
  • src/test/java/tigase/cluster/ClusterConnectionSelectorTest.java
    ■ ■ ■ ■ ■ ■
     1 +/*
     2 + * ClusterConnectionSelectorTest.java
     3 + *
     4 + * Tigase Jabber/XMPP Server
     5 + * Copyright (C) 2004-2015 "Tigase, Inc." <office@tigase.com>
     6 + *
     7 + * This program is free software: you can redistribute it and/or modify
     8 + * it under the terms of the GNU Affero General Public License as published by
     9 + * the Free Software Foundation, either version 3 of the License,
     10 + * or (at your option) any later version.
     11 + *
     12 + * This program is distributed in the hope that it will be useful,
     13 + * but WITHOUT ANY WARRANTY; without even the implied warranty of
     14 + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
     15 + * GNU Affero General Public License for more details.
     16 + *
     17 + * You should have received a copy of the GNU Affero General Public License
     18 + * along with this program. Look for COPYING file in the top folder.
     19 + * If not, see http://www.gnu.org/licenses/.
     20 + *
     21 + */
     22 +package tigase.cluster;
     23 + 
     24 +import java.util.Arrays;
     25 +import java.util.HashMap;
     26 +import java.util.HashSet;
     27 +import java.util.Map;
     28 +import java.util.Set;
     29 +import junit.framework.TestCase;
     30 +import org.junit.Test;
     31 +import static tigase.cluster.ClusterConnectionSelector.CLUSTER_SYS_CONNECTIONS_PER_NODE_PROP_KEY;
     32 +import tigase.cluster.api.ClusterConnectionHandler;
     33 +import tigase.server.Packet;
     34 +import tigase.server.Priority;
     35 +import tigase.xml.Element;
     36 +import tigase.xmpp.XMPPIOService;
     37 + 
     38 +/**
     39 + *
     40 + * @author andrzej
     41 + */
     42 +public class ClusterConnectionSelectorTest extends TestCase {
     43 + 
     44 + @Test
     45 + public void testSelectConnection() throws Exception {
     46 + ClusterConnection conn = new ClusterConnection("test");
     47 + ClusterConnectionSelector selector = new ClusterConnectionSelector();
     48 + selector.setClusterConnectionHandler(new ClusterConnectionHandler() {
     49 + 
     50 + @Override
     51 + public int hashCodeForPacket(Packet packet) {
     52 + return packet.getStanzaFrom().hashCode();
     53 + }
     54 + });
     55 +
     56 + Element el = new Element("iq", new String[] { "from" }, new String[] { "test1" });
     57 + Packet p = Packet.packetInstance(el);
     58 + assertNull(selector.selectConnection(p, conn));
     59 +
     60 + XMPPIOService<Object> serv1 = new XMPPIOService<Object>();
     61 + conn.addConn(serv1);
     62 + assertEquals(serv1, selector.selectConnection(p, conn));
     63 +
     64 + p.setPriority(Priority.SYSTEM);
     65 + assertEquals(serv1, selector.selectConnection(p, conn));
     66 + 
     67 + p.setPriority(null);
     68 + XMPPIOService<Object> serv2 = new XMPPIOService<Object>();
     69 + conn.addConn(serv2);
     70 + assertEquals(2, conn.size());
     71 + assertEquals(serv2, selector.selectConnection(p, conn));
     72 +
     73 + p.setPriority(Priority.SYSTEM);
     74 + assertEquals(serv1, selector.selectConnection(p, conn));
     75 + 
     76 + p.setPriority(null);
     77 + XMPPIOService<Object> serv3 = new XMPPIOService<Object>();
     78 + conn.addConn(serv3);
     79 + assertEquals(3, conn.size());
     80 + assertNotSame(serv1, selector.selectConnection(p, conn));
     81 +
     82 + p.setPriority(Priority.SYSTEM);
     83 + assertEquals(serv1, selector.selectConnection(p, conn));
     84 +
     85 + el = new Element("iq", new String[] { "from" }, new String[] { "test2" });
     86 + p = Packet.packetInstance(el);
     87 + assertEquals(3, conn.size());
     88 + assertNotSame(serv1, selector.selectConnection(p, conn));
     89 + 
     90 + el = new Element("iq", new String[] { "from" }, new String[] { "test3" });
     91 + p = Packet.packetInstance(el);
     92 + assertEquals(3, conn.size());
     93 + assertNotSame(serv1, selector.selectConnection(p, conn));
     94 + 
     95 + el = new Element("iq", new String[] { "from" }, new String[] { "test4" });
     96 + p = Packet.packetInstance(el);
     97 + assertEquals(3, conn.size());
     98 + assertNotSame(serv1, selector.selectConnection(p, conn));
     99 + }
     100 +
     101 + @Test
     102 + public void testSelectConnectionFor2() throws Exception {
     103 + ClusterConnection conn = new ClusterConnection("test");
     104 + ClusterConnectionSelector selector = new ClusterConnectionSelector();
     105 + selector.setClusterConnectionHandler(new ClusterConnectionHandler() {
     106 + 
     107 + @Override
     108 + public int hashCodeForPacket(Packet packet) {
     109 + return packet.getStanzaFrom().hashCode();
     110 + }
     111 + });
     112 + Map<String,Object> props = new HashMap<>();
     113 + props.put(CLUSTER_SYS_CONNECTIONS_PER_NODE_PROP_KEY, 2);
     114 + selector.setProperties(props);
     115 +
     116 + Element el = new Element("iq", new String[] { "from" }, new String[] { "test1" });
     117 + Packet p = Packet.packetInstance(el);
     118 + assertNull(selector.selectConnection(p, conn));
     119 +
     120 + XMPPIOService<Object> serv1 = new XMPPIOService<Object>();
     121 + conn.addConn(serv1);
     122 + assertEquals(serv1, selector.selectConnection(p, conn));
     123 +
     124 + p.setPriority(Priority.SYSTEM);
     125 + assertEquals(serv1, selector.selectConnection(p, conn));
     126 + 
     127 + p.setPriority(null);
     128 + XMPPIOService<Object> serv2 = new XMPPIOService<Object>();
     129 + conn.addConn(serv2);
     130 + Set<XMPPIOService<Object>> sysServs = new HashSet<>(Arrays.asList(serv1, serv2));
     131 + assertEquals(2, conn.size());
     132 + assertTrue(sysServs.contains(selector.selectConnection(p, conn)));
     133 +
     134 + p.setPriority(Priority.SYSTEM);
     135 + assertTrue(sysServs.contains(selector.selectConnection(p, conn)));
     136 + 
     137 + p.setPriority(null);
     138 + XMPPIOService<Object> serv3 = new XMPPIOService<Object>();
     139 + conn.addConn(serv3);
     140 + assertEquals(3, conn.size());
     141 + assertSame(serv3, selector.selectConnection(p, conn));
     142 +
     143 + p.setPriority(Priority.SYSTEM);
     144 + assertTrue(sysServs.contains(selector.selectConnection(p, conn)));
     145 +
     146 + el = new Element("iq", new String[] { "from" }, new String[] { "test2" });
     147 + p = Packet.packetInstance(el);
     148 + assertEquals(3, conn.size());
     149 + assertSame(serv3, selector.selectConnection(p, conn));
     150 + 
     151 + el = new Element("iq", new String[] { "from" }, new String[] { "test3" });
     152 + p = Packet.packetInstance(el);
     153 + assertEquals(3, conn.size());
     154 + assertSame(serv3, selector.selectConnection(p, conn));
     155 + 
     156 + el = new Element("iq", new String[] { "from" }, new String[] { "test4" });
     157 + p = Packet.packetInstance(el);
     158 + assertEquals(3, conn.size());
     159 + assertSame(serv3, selector.selectConnection(p, conn));
     160 + }
     161 + 
     162 +}
     163 + 
Please wait...
Page is in error, reload to recover