Recommendation Engine by using Apache Mahout & Cassandra

10 / Sep / 2014 by Pavan 0 comments

As a part of R&D we are trying to build a recommendations for users by using Apache Mahout & Cassandra.There was a direct Cassandra  DataModelCassandraDataModel based on a Cassandra keyspace, but there was a lot of confusions which involves so many methods and functions,then we are tried to minimize it by using Cassandra Indexes  and Mahout FileDataModel

Here we are having 1 millions of Records in Cassandra Database in a testkeyspace with 2 tables and their schema are

CREATE TABLE SampleUser (
user_id bigint,
ratings float,
item_id bigint,
PRIMARY KEY (user_id, ratings, item_id)
)

CREATE TABLE SampleItems (
item_id bigint,
user_id bigint,
ratings float,
PRIMARY KEY (item_id, user_id)
)

We want to provide User based Recommendations, Item based recommendation with help of similar users (For example, based on the rating giving by other users in a common locality or other similarity).

They are

1)Customers Who Bought This Item Also Bought                                                                                      2)Customers Who Viewed This Item Also Viewed

The java code for Mahout Recommendations in Cassandra Database

[html]
public class RecommendationsFromCassandra {
private Cluster cluster;
private Session session;

/*
* Connection to Cassandra Database
*/

public void connect(String node) {
cluster = Cluster.builder().addContactPoint(node).build();
Metadata metadata = cluster.getMetadata();
session = cluster.connect();
}

/*
* Executing the query to get the user ids who are having particular item
*/

public void query1() throws IOException {
String userids = null;
ResultSet results = session
.execute("SELECT user_id FROM testkeyspace.sampleitems WHERE item_id = 914");
for (Row row : results)
{
if (userids == null)
userids = Long.toString(row.getLong("user_id"));
else
userids = userids + "," + row.getLong("user_id");
}

/*
* Transfer the all user ids to the 2nd query to get the all users and
* their item id’s , Ratings
*/

String stmt = "SELECT user_id,item_id,ratings FROM testkeyspace.sampleuser WHERE user_id in ("
+ userids + ") ;";

Statement s1 = new SimpleStatement(stmt);
s1.setFetchSize(Integer.MAX_VALUE);

ResultSet results1 = session.execute(s1);
File newTextFile = new File("${env:HOME}/thetextfile.txt");
FileWriter fw = new FileWriter(newTextFile);

for (Row row2 : results1) {
fw.write(row2.getLong("user_id") + "\t" + row2.getLong("item_id")
+ "\t" + row2.getFloat("ratings") + "\n");
}
fw.close();

/*
* Recommendation Part
*/

UserSimilarity similarity;
try {
DataModel datamodel = new FileDataModel(new File(
"${env:HOME}/thetextfile.txt"));
similarity = new PearsonCorrelationSimilarity(datamodel);
UserNeighborhood neighbourhood = new NearestNUserNeighborhood(100,
similarity, datamodel);

Recommender recommender = new GenericUserBasedRecommender(
datamodel, neighbourhood, similarity);
long start = System.currentTimeMillis();
List<RecommendedItem> recommendations = recommender.recommend(10,
10);
for (RecommendedItem recommendation : recommendations) {
System.out.println(recommendation);

}
long stop = System.currentTimeMillis();
System.out.println("Took: " + (stop – start) + " millis");

} catch (TasteException e) {
e.printStackTrace();
}

}

public void close() {
cluster.close();
}

public static void main(String[] args) throws IOException {
RecommendationsFromCassandra client = new RecommendationsFromCassandra();
client.connect("127.0.0.1");
client.query1();
// CassandraRecommender r = new CassandraRecommender();
client.close();
File file = new File("${env:HOME}/thetextfile.txt");
file.delete();
}
}
[/html]


The time consumming for Recommendations to particular user is
2.40 sec for 1m records

 

 

FOUND THIS USEFUL? SHARE IT

Leave a Reply

Your email address will not be published. Required fields are marked *