Projects tigase _server server-core Issues #776
When I use mongodb to save muc message, there can't add message to history. HistoryProvider is null (#776)
Closed
来恩 周 opened 8 years ago
Due Date
2017-03-14

2017-03-14 12:11:24.870 [in_7-muc] GroupchatMessageModule.addMessageToHistory() WARNING: Can't add message to history!

java.lang.NullPointerException

at tigase.muc.history.HistoryProviderMDBean.addMessage(HistoryProviderMDBean.java:88)

at tigase.muc.modules.GroupchatMessageModule.addMessageToHistory(GroupchatMessageModule.java:87)

at tigase.muc.modules.GroupchatMessageModule.process(GroupchatMessageModule.java:286)

at tigase.component.modules.StanzaProcessor.process(StanzaProcessor.java:56)

at tigase.component.modules.StanzaProcessor.processPacket(StanzaProcessor.java:77)

at tigase.component.AbstractKernelBasedComponent.processPacket(AbstractKernelBasedComponent.java:102)

at tigase.muc.MUCComponent.processPacket(MUCComponent.java:125)

at tigase.server.AbstractMessageReceiver$QueueListener.run(AbstractMessageReceiver.java:1513)

###################################

/*

  • MongoHistoryProvider.java

  • Tigase Jabber/XMPP Server - MongoDB support

  • Copyright (C) 2004-2016 "Tigase, Inc." office@tigase.com

  • This program is free software: you can redistribute it and/or modify

  • it under the terms of the GNU Affero General Public License as published by

  • the Free Software Foundation, either version 3 of the License,

  • or (at your option) any later version.

  • This program is distributed in the hope that it will be useful,

  • but WITHOUT ANY WARRANTY; without even the implied warranty of

  • MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the

  • GNU Affero General Public License for more details.

  • You should have received a copy of the GNU Affero General Public License

  • along with this program. Look for COPYING file in the top folder.

  • If not, see http://www.gnu.org/licenses/.

*/

package tigase.mongodb.muc;

import com.mongodb.MongoNamespace;

import com.mongodb.client.FindIterable;

import com.mongodb.client.MongoCollection;

import com.mongodb.client.MongoDatabase;

import com.mongodb.client.model.Filters;

import org.bson.Document;

import org.bson.conversions.Bson;

import org.bson.types.Binary;

import tigase.component.PacketWriter;

import tigase.component.exceptions.ComponentException;

import tigase.db.Repository;

import tigase.db.TigaseDBException;

import tigase.kernel.beans.config.ConfigField;

import tigase.mongodb.MongoDataSource;

import tigase.mongodb.RepositoryVersionAware;

import tigase.muc.Affiliation;

import tigase.muc.Room;

import tigase.muc.RoomConfig;

import tigase.muc.history.AbstractHistoryProvider;

import tigase.server.Packet;

import tigase.util.TigaseStringprepException;

import tigase.xml.Element;

import tigase.xmpp.Authorization;

import tigase.xmpp.BareJID;

import tigase.xmpp.JID;

import tigase.xmpp.mam.MAMRepository;

import tigase.xmpp.mam.Query;

import tigase.xmpp.mam.QueryImpl;

import java.nio.charset.Charset;

import java.security.MessageDigest;

import java.security.NoSuchAlgorithmException;

import java.util.*;

import java.util.logging.Level;

import static com.mongodb.client.model.Accumulators.first;

import static com.mongodb.client.model.Aggregates.group;

import static tigase.mongodb.Helper.collectionExists;

/**

  • @author andrzej

*/

@Repository.Meta( supportedUris = { "mongodb:.*" } )

public class MongoHistoryProvider

	extends AbstractHistoryProvider<MongoDataSource>

	implements RepositoryVersionAware, MAMRepository {

private static final int DEF_BATCH_SIZE = 100;

private static final String HASH_ALG = "SHA-256";

private static final String HISTORY_COLLECTION = "tig_muc_room_history";

private static final String HISTORY_COLLECTION_OLD = "muc_history";

private static final Charset UTF8 = Charset.forName("UTF-8");


private MongoDatabase db;

protected MongoCollection<Document> historyCollection;


@ConfigField(desc = "Batch size", alias = "batch-size")

private int batchSize = DEF_BATCH_SIZE;


protected byte[] generateId(BareJID user) throws TigaseDBException {

	return calculateHash(user.toString().toLowerCase());

}


protected byte[] calculateHash(String user) throws TigaseDBException {

	try {

		MessageDigest md = MessageDigest.getInstance(HASH_ALG);

		return md.digest(user.getBytes(UTF8));

	} catch (NoSuchAlgorithmException ex) {

		throw new TigaseDBException("Should not happen!!", ex);

	}

}


@Override

public void addJoinEvent(Room room, Date date, JID senderJID, String nickName) {

}


@Override

public void addLeaveEvent(Room room, Date date, JID senderJID, String nickName) {

}


@Override

public void addMessage(Room room, Element message, String body, JID senderJid, String senderNickname, Date time) {

	try {

		byte[] rid = generateId(room.getRoomJID());

		Document dto = new Document("room_jid_id", rid).append("room_jid", room.getRoomJID().toString())

				.append("event_type", 1)

				.append("sender_jid", senderJid.toString()).append("sender_nickname", senderNickname)

				.append("body", body).append("public_event", room.getConfig().isLoggingEnabled());

		if (time != null) {

			dto.append("timestamp", time);

		}

		if (message != null) {

			dto.append("msg", message.toString());

		}

		historyCollection.insertOne(dto);

	} catch (Exception ex) {

		log.log(Level.WARNING, "Can't add MUC message to database", ex);

		throw new RuntimeException(ex);

	}

}


@Override

public void addSubjectChange(Room room, Element message, String subject, JID senderJid, String senderNickname, Date time) {

}


@Override

public void destroy() {

}


@Override

public void getHistoryMessages(Room room, JID senderJID, Integer maxchars, Integer maxstanzas, Integer seconds, Date since, PacketWriter writer) {

	Affiliation recipientAffiliation = room.getAffiliation(senderJID.getBareJID());

	boolean addRealJids = room.getConfig().getRoomAnonymity() == RoomConfig.Anonymity.nonanonymous

			|| room.getConfig().getRoomAnonymity() == RoomConfig.Anonymity.semianonymous

			&& (recipientAffiliation == Affiliation.owner || recipientAffiliation == Affiliation.admin);


	try {

		byte[] rid = generateId(room.getRoomJID());

		int maxMessages = room.getConfig().getMaxHistory();

		int limit = maxstanzas != null ? Math.min(maxMessages, maxstanzas) : maxMessages;

		if (since == null && seconds != null && maxstanzas == null) {

			since = new Date(new Date().getTime() - seconds * 1000);

		}

		
		Document crit = new Document("room_jid_id", rid);

		if (since != null) {

			crit.append("timestamp", new Document("$gte", since));

			Document order = new Document("timestamp", 1);

			FindIterable<Document> cursor = historyCollection.find(crit).batchSize(batchSize).limit(limit).sort(order);

			for (Document dto : cursor) {

				Packet packet = createMessage(room.getRoomJID(), senderJID, dto, addRealJids);

				writer.write(packet);

			}

		} else {

			Document order = new Document("timestamp", -1);

			FindIterable<Document> cursor = historyCollection.find(crit).batchSize(batchSize).limit(limit).sort(order);

			List<Packet> results = new ArrayList<Packet>();

			for (Document dto : cursor) {

				Packet packet = createMessage(room.getRoomJID(), senderJID, dto, addRealJids);

				results.add(packet);

			}

			Collections.reverse(results);

			writer.write(results);

		}

	} catch (Exception ex) {

		if (log.isLoggable(Level.SEVERE))

			log.log(Level.SEVERE, "Can't get history", ex);

		throw new RuntimeException(ex);

	}

}


@Override

public boolean isPersistent(Room room) {

	return true;

}


@Override

public void removeHistory(Room room) {

	try {

		byte[] rid = generateId(room.getRoomJID());

		Document crit = new Document("room_jid_id", rid);

		db.getCollection(HISTORY_COLLECTION).deleteMany(crit);

	} catch (Exception ex) {

		if (log.isLoggable(Level.SEVERE))

			log.log(Level.SEVERE, "Can't remove history", ex);

		throw new RuntimeException(ex);

	}		

}


@Override

public void setDataSource(MongoDataSource dataSource) {

	db = dataSource.getDatabase();


	if (!collectionExists(db, HISTORY_COLLECTION)) {

		if (collectionExists(db, HISTORY_COLLECTION_OLD)) {

			db.getCollection(HISTORY_COLLECTION_OLD).renameCollection(new MongoNamespace(db.getName(), HISTORY_COLLECTION));

		} else {

			db.createCollection(HISTORY_COLLECTION);

		}

	}

	historyCollection = db.getCollection(HISTORY_COLLECTION);


	historyCollection.createIndex(new Document("room_jid_id", 1));

	historyCollection.createIndex(new Document("room_jid_id", 1).append("timestamp", 1));

}


@Override

public void updateSchema() throws TigaseDBException {

	List<Bson> aggregationQuery = Arrays.asList(group("$room_jid_id", first("room_jid", "$room_jid")));

	for (Document doc : historyCollection.aggregate(aggregationQuery).batchSize(100)) {

		String roomJid = (String) doc.get("room_jid");


		byte[] oldRoomJidId = ((Binary) doc.get("_id")).getData();

		byte[] newRoomJidId = calculateHash(roomJid.toString().toLowerCase());


		if (Arrays.equals(oldRoomJidId, newRoomJidId)) {

			continue;

		}


		historyCollection.updateMany(new Document("room_jid_id", oldRoomJidId),

									 new Document("$set", new Document("room_jid_id", newRoomJidId)));

	}

}


private Long getItemPosition(String msgId, Bson filter) throws ComponentException {

	if (msgId == null) {

		return null;

	}

	try {

		Date ts = new Date(Long.parseLong(msgId));


		return historyCollection.count(Filters.and(filter, Filters.lt("timestamp", ts)));

	} catch (NumberFormatException ex) {

		throw new ComponentException(Authorization.ITEM_NOT_FOUND, "Not found message with id = " + msgId);

	}

}


@Override

public void queryItems(Query query, ItemHandler itemHandler) throws TigaseDBException, ComponentException {

	try {

		byte[] rid = generateId(query.getComponentJID().getBareJID());


		List<Bson> filters = new ArrayList<>();

		filters.add(Filters.eq("room_jid_id", rid));


		if (query.getStart() != null) {

			filters.add(Filters.gte("timestamp", query.getStart()));

		}

		if (query.getEnd() != null) {

			filters.add(Filters.lte("timestamp", query.getEnd()));

		}

		if (query.getWith() != null) {

			filters.add(Filters.eq("sender_nickname", query.getWith().toString()));

		}


		Bson filter = Filters.and(filters);

		long count = historyCollection.count(filter);


		Long after = getItemPosition(query.getRsm().getAfter(), filter);

		Long before = getItemPosition(query.getRsm().getBefore(), filter);


		AbstractHistoryProvider.calculateOffsetAndPosition(query, (int) count, before == null ? null : before.intValue(), after == null ? null : after.intValue());


		Document order = new Document("timestamp", 1);

		FindIterable<Document> cursor = historyCollection.find(filter).sort(order).skip(query.getRsm().getIndex()).limit(query.getRsm().getMax());

		for (Document dto : cursor) {

			String sender_nickname = (String) dto.get("sender_nickname");

			String msg = (String) dto.get("msg");

			String body = (String) dto.get("body");

			Date timestamp = (Date) dto.get("timestamp");


			Element msgEl = createMessageElement(query.getComponentJID().getBareJID(), query.getQuestionerJID(), sender_nickname, msg, body);

			Item item = new Item() {

				@Override

				public String getId() {

					return String.valueOf(timestamp.getTime());

				}


				@Override

				public Element getMessage() {

					return msgEl;

				}


				@Override

				public Date getTimestamp() {

					return timestamp;

				}

			};

			itemHandler.itemFound(query, item);

		}

	} catch (Exception ex) {

		if (log.isLoggable(Level.SEVERE))

			log.log(Level.SEVERE, "Can't get history", ex);

		throw new RuntimeException(ex);

	}


}


@Override

public Query newQuery() {

	return new QueryImpl();

}



private Packet createMessage(BareJID roomJid, JID senderJID, Document dto, boolean addRealJids) throws TigaseStringprepException {

	String sender_nickname = (String) dto.get("sender_nickname");

	String msg = (String) dto.get("msg");

	String body = (String) dto.get("body");

	String sender_jid = (String) dto.get("sender_jid");

	Date timestamp = (Date) dto.get("timestamp");

	
	return createMessage(roomJid, senderJID, sender_nickname, msg, body, sender_jid, addRealJids, timestamp);

}

}

来恩 周 commented 8 years ago

I have already solved the problem when I configure init.properties file like

muc (class: tigase.muc.MUCComponent) {

admin (class: tigase.muc.modules.ModeratorModule3) {}

disco (class: tigase.muc.modules.DiscoveryModule3) {}

groupchat (class: tigase.muc.modules.GroupchatMessageModule3) {}

presences (class: tigase.muc.modules.PresenceModuleImpl3) {}

historyProviderPool (class: tigase.muc.history.HistoryProviderMDBean) {

	default (class: tigase.muc.history.HistoryProviderMDBean.HistoryProviderConfigBean) {

		name = 'mucHistoryProvider' 

    	'repo-class' = 'tigase.mongodb.muc.MongoHistoryProvider'

		instance (class: tigase.mongodb.muc.MongoHistoryProvider) {}

	}

}

}

Andrzej Wójcik (Tigase) commented 8 years ago

I've found and fixed root cause of this issue. Additionally I also fixed few issues found during testing Tigase XMPP Server 7.2.0-SNAPSHOT with MongoDB.

This issue was caused by failure of automatic discovery of history provided implementation for MongoDB, which could be replaced with manual configuration (what you mentioned in comment above).

Next snapshot build will contain my fix for this issue.

来恩 周 commented 7 years ago

Thank you.

wojciech.kapcia@tigase.net commented 7 years ago

来恩 周 wrote:

Thank you.

Assuming resolved.

Referenced from commit 11 months ago
#776 - NPE while setting s2s/command/... in init properties
git-svn-id: file:///home/svn/repos/tigase-server/trunk@2962 7d282ba1-3ae6-0310-8f9b-c9008a0864d2
wojtek committed 1 decade ago
issue 1 of 1
Type
Bug
Priority
Normal
Assignee
RedmineID
5066
Version
tigase-server-8.0.0
Spent time
21h 15m
Issue Votes (0)
Watchers (0)
Reference
tigase/_server/server-core#776
Please wait...
Page is in error, reload to recover