Recommendation Engine by using Apache Mahout & Cassandra

10 / Sep / 2014 by Pavan 0 comments

As a part of R&D we are trying to build a recommendations for users by using Apache Mahout & Cassandra.There was a direct Cassandra  DataModelCassandraDataModel based on a Cassandra keyspace, but there was a lot of confusions which involves so many methods and functions,then we are tried to minimize it by using Cassandra Indexes  and Mahout FileDataModel

Here we are having 1 millions of Records in Cassandra Database in a testkeyspace with 2 tables and their schema are

CREATE TABLE SampleUser (
user_id bigint,
ratings float,
item_id bigint,
PRIMARY KEY (user_id, ratings, item_id)
)

CREATE TABLE SampleItems (
item_id bigint,
user_id bigint,
ratings float,
PRIMARY KEY (item_id, user_id)
)

We want to provide User based Recommendations, Item based recommendation with help of similar users (For example, based on the rating giving by other users in a common locality or other similarity).

They are

1)Customers Who Bought This Item Also Bought                                                                                      2)Customers Who Viewed This Item Also Viewed

The java code for Mahout Recommendations in Cassandra Database

public class RecommendationsFromCassandra {
private Cluster cluster;
private Session session;

/*
* Connection to Cassandra Database
*/

public void connect(String node) {
cluster = Cluster.builder().addContactPoint(node).build();
Metadata metadata = cluster.getMetadata();
session = cluster.connect();
}

/*
* Executing the query to get the user ids who are having particular item
*/

public void query1() throws IOException {
String userids = null;
ResultSet results = session
.execute("SELECT user_id FROM testkeyspace.sampleitems WHERE item_id = 914");
for (Row row : results)
           {
if (userids == null)
userids = Long.toString(row.getLong("user_id"));
else
userids = userids + "," + row.getLong("user_id");
}

/*
* Transfer the all user ids to the 2nd query to get the all users and
* their item id's , Ratings
*/

String stmt = "SELECT user_id,item_id,ratings FROM testkeyspace.sampleuser WHERE user_id in ("
+ userids + ") ;";

Statement s1 = new SimpleStatement(stmt);
s1.setFetchSize(Integer.MAX_VALUE);

ResultSet results1 = session.execute(s1);
File newTextFile = new File("${env:HOME}/thetextfile.txt");
FileWriter fw = new FileWriter(newTextFile);

for (Row row2 : results1) {
fw.write(row2.getLong("user_id") + "\t" + row2.getLong("item_id")
+ "\t" + row2.getFloat("ratings") + "\n");
}
fw.close();

/*
* Recommendation Part
*/

UserSimilarity similarity;
try {
DataModel datamodel = new FileDataModel(new File(
"${env:HOME}/thetextfile.txt"));
similarity = new PearsonCorrelationSimilarity(datamodel);
UserNeighborhood neighbourhood = new NearestNUserNeighborhood(100,
similarity, datamodel);

Recommender recommender = new GenericUserBasedRecommender(
datamodel, neighbourhood, similarity);
long start = System.currentTimeMillis();
List<RecommendedItem> recommendations = recommender.recommend(10,
10);
for (RecommendedItem recommendation : recommendations) {
System.out.println(recommendation);

}
long stop = System.currentTimeMillis();
System.out.println("Took: " + (stop - start) + " millis");

} catch (TasteException e) {
e.printStackTrace();
}

}

public void close() {
cluster.close();
}

public static void main(String[] args) throws IOException {
RecommendationsFromCassandra client = new RecommendationsFromCassandra();
client.connect("127.0.0.1");
client.query1();
// CassandraRecommender r = new CassandraRecommender();
client.close();
File file = new File("${env:HOME}/thetextfile.txt");
file.delete();
}
}


The time consumming for Recommendations to particular user is
2.40 sec for 1m records

 

 

Tag -

FOUND THIS USEFUL? SHARE IT

Leave a comment -