Implémentation complète de l'ordonnancement.
[Plinn.git] / catalog.py
1 # -*- coding: utf-8 -*-
2 from App.class_init import InitializeClass
3 from AccessControl import ClassSecurityInfo
4 from Products.CMFCore.interfaces import IIndexableObject
5 from Products.CMFCore.CatalogTool import CatalogTool as BaseCatalogTool
6 from Products.CMFCore.CatalogTool import IndexableObjectWrapper
7 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
8 from Products.CMFCore.permissions import ModifyPortalContent, ManagePortal
9 from zope.component import queryMultiAdapter
10 from Products.ZCatalog.Catalog import Catalog
11 import transaction
12 from solr import *
13
14 # imports for Catalog class
15 from Products.PluginIndexes.interfaces import ILimitedResultIndex
16 from Products.ZCatalog.Lazy import LazyMap, LazyCat, LazyValues
17 from BTrees.IIBTree import intersection, IISet
18 from BTrees.IIBTree import weightedIntersection
19 import warnings
20
21 _VOLATILE_SOLR_NAME = '_v_solrConnection'
22
23 class SolrTransactionHook :
24 ''' commit solr couplé sur le commit de la ZODB '''
25 def __init__(self, context, con) :
26 self.context = context
27 self.con = con
28
29 def __call__(self, status) :
30 if status :
31 self.con.commit()
32 self.con.close()
33 else :
34 self.con.close()
35 try :
36 delattr(self.context, _VOLATILE_SOLR_NAME)
37 except AttributeError :
38 pass
39
40 class CatalogTool(BaseCatalogTool) :
41 meta_type = 'Plinn Catalog'
42 security = ClassSecurityInfo()
43 manage_options = (BaseCatalogTool.manage_options[:5] +
44 ({'label' : 'Solr', 'action' : 'manage_solr'},) +
45 BaseCatalogTool.manage_options[5:])
46 manage_solr = PageTemplateFile('www/manage_solr.pt', globals(), __name__='manage_solr')
47
48
49
50 def __init__(self, idxs=[]) :
51 super(CatalogTool, self).__init__()
52 self._catalog = DelegatedCatalog(self)
53 self.solr_url = 'http://localhost:8983/solr'
54 self.delegatedIndexes = ('Title', 'Description', 'SearchableText')
55
56 security.declarePublic('getDelegatedIndexes')
57 def getDelegatedIndexes(self) :
58 """ read the method name """
59 return self.delegatedIndexes
60
61 security.declareProtected(ManagePortal, 'setDelegatedIndexes')
62 def setDelegatedIndexes(self, indexes, REQUEST=None) :
63 """setDelegatedIndexes documentation"""
64 self.delegatedIndexes = tuple([i.strip() for i in indexes if i.strip()])
65 if REQUEST :
66 REQUEST.RESPONSE.redirect(self.absolute_url() + '/manage_solr?manage_tabs_message=Saved changes.')
67
68 def _getSolrConnection(self) :
69 if not hasattr(self, _VOLATILE_SOLR_NAME) :
70 con = SolrConnection(self.solr_url)
71 setattr(self, _VOLATILE_SOLR_NAME, con)
72 txn = transaction.get()
73 txn.addAfterCommitHook(SolrTransactionHook(self, con))
74 return getattr(self, _VOLATILE_SOLR_NAME)
75
76 security.declarePrivate('solrAdd')
77 def solrAdd(self, w, uid, idxs) :
78 idxs = idxs if idxs else self.delegatedIndexes
79 # Filter out delegated indexes
80 idxs = [i for i in idxs if i in self.delegatedIndexes]
81 data = {'id' : uid}
82 for name in idxs :
83 attr = getattr(w, name, '')
84 data[name] = attr() if callable(attr) else attr
85 c = self._getSolrConnection()
86 c.add(**data)
87
88 # PortalCatalog api overloads
89 def catalog_object(self, obj, uid=None, idxs=None, update_metadata=1,
90 pghandler=None):
91 # Wraps the object with workflow and accessibility
92 # information just before cataloging.
93 if IIndexableObject.providedBy(obj):
94 w = obj
95 else:
96 w = queryMultiAdapter( (obj, self), IIndexableObject )
97 if w is None:
98 # BBB
99 w = IndexableObjectWrapper(obj, self)
100
101 idxs_ = idxs
102 if idxs:
103 # Filter out invalid indexes.
104 valid_indexes = self._catalog.indexes.keys()
105 idxs_ = [i for i in idxs if i in valid_indexes]
106
107 super(CatalogTool, self).catalog_object(w, uid, idxs_, update_metadata, pghandler)
108 self.solrAdd(w, uid, idxs)
109
110 security.declarePrivate('reindexObject')
111 def reindexObject(self, object, idxs=[], update_metadata=1, uid=None):
112 """Update catalog after object data has changed.
113
114 The optional idxs argument is a list of specific indexes
115 to update (all of them by default).
116
117 The update_metadata flag controls whether the object's
118 metadata record is updated as well.
119
120 If a non-None uid is passed, it will be used as the catalog uid
121 for the object instead of its physical path.
122 """
123 if uid is None:
124 uid = self.__url(object)
125
126 self.catalog_object(object, uid, idxs, update_metadata)
127
128 security.declarePrivate('unindexObject')
129 def unindexObject(self, object):
130 """Remove from catalog.
131 """
132 super(CatalogTool, self).unindexObject(object)
133 c = self._getSolrConnection()
134 url = self.__url(object)
135 c.delete(id=url)
136
137 InitializeClass(CatalogTool)
138
139
140 class DelegatedCatalog(Catalog) :
141 '''C'est ici qu'on délègue effectivement à Solr '''
142
143 def __init__(self, zcat, brains=None) :
144 Catalog.__init__(self, brains=brains)
145 self.zcat = zcat
146
147 def delegateSearch(self, query, plan) :
148 '''
149 retours faux :
150 None signifie : pas de délégation, il faut continuer à interroger les autres index.
151 IISet() vide : pas de résultat lors de la délégation, on peut arrêter la recherche.
152 '''
153 indexes = set(query.keys()).intersection(set(self.zcat.delegatedIndexes))
154 if not indexes :
155 return None
156 delegatedQuery = {}
157 for i in indexes :
158 delegatedQuery[i] = query.pop(i)
159 try : plan.remove(i)
160 except ValueError : pass
161 c = SolrConnection(self.zcat.solr_url)
162 q =' AND '.join(['%s:"%s"' % item for item in delegatedQuery.items()])
163 resp = c.query(q, fields='id', rows=len(self))
164 c.close()
165 return IISet(filter(None, [self.uids.get(r['id']) for r in resp.results]))
166
167 def search(self, query, sort_index=None, reverse=0, limit=None, merge=1):
168 """Iterate through the indexes, applying the query to each one. If
169 merge is true then return a lazy result set (sorted if appropriate)
170 otherwise return the raw (possibly scored) results for later merging.
171 Limit is used in conjuntion with sorting or scored results to inform
172 the catalog how many results you are really interested in. The catalog
173 can then use optimizations to save time and memory. The number of
174 results is not guaranteed to fall within the limit however, you should
175 still slice or batch the results as usual."""
176
177 rs = None # resultset
178
179 # Indexes fulfill a fairly large contract here. We hand each
180 # index the query mapping we are given (which may be composed
181 # of some combination of web request, kw mappings or plain old dicts)
182 # and the index decides what to do with it. If the index finds work
183 # for itself in the query, it returns the results and a tuple of
184 # the attributes that were used. If the index finds nothing for it
185 # to do then it returns None.
186
187 # Canonicalize the request into a sensible query before passing it on
188 query = self.make_query(query)
189
190 cr = self.getCatalogPlan(query)
191 cr.start()
192
193 plan = cr.plan()
194 if not plan:
195 plan = self._sorted_search_indexes(query)
196
197 # délégation
198 rs = self.delegateSearch(query, plan)
199 if rs is not None and not rs :
200 return LazyCat([])
201
202 indexes = self.indexes.keys()
203 for i in plan:
204 if i not in indexes:
205 # We can have bogus keys or the plan can contain index names
206 # that have been removed in the meantime
207 continue
208
209 index = self.getIndex(i)
210 _apply_index = getattr(index, "_apply_index", None)
211 if _apply_index is None:
212 continue
213
214 cr.start_split(i)
215 limit_result = ILimitedResultIndex.providedBy(index)
216 if limit_result:
217 r = _apply_index(query, rs)
218 else:
219 r = _apply_index(query)
220
221 if r is not None:
222 r, u = r
223 # Short circuit if empty result
224 # BBB: We can remove the "r is not None" check in Zope 2.14
225 # once we don't need to support the "return everything" case
226 # anymore
227 if r is not None and not r:
228 cr.stop_split(i, result=None, limit=limit_result)
229 return LazyCat([])
230
231 # provide detailed info about the pure intersection time
232 intersect_id = i + '#intersection'
233 cr.start_split(intersect_id)
234 # weightedIntersection preserves the values from any mappings
235 # we get, as some indexes don't return simple sets
236 if hasattr(rs, 'items') or hasattr(r, 'items'):
237 _, rs = weightedIntersection(rs, r)
238 else:
239 rs = intersection(rs, r)
240
241 cr.stop_split(intersect_id)
242
243 # consider the time it takes to intersect the index result with
244 # the total resultset to be part of the index time
245 cr.stop_split(i, result=r, limit=limit_result)
246 if not rs:
247 break
248 else:
249 cr.stop_split(i, result=None, limit=limit_result)
250
251 # Try to deduce the sort limit from batching arguments
252 b_start = int(query.get('b_start', 0))
253 b_size = query.get('b_size', None)
254 if b_size is not None:
255 b_size = int(b_size)
256
257 if b_size is not None:
258 limit = b_start + b_size
259 elif limit and b_size is None:
260 b_size = limit
261
262 if rs is None:
263 # None of the indexes found anything to do with the query
264 # We take this to mean that the query was empty (an empty filter)
265 # and so we return everything in the catalog
266 warnings.warn('Your query %s produced no query restriction. '
267 'Currently the entire catalog content is returned. '
268 'In Zope 2.14 this will result in an empty LazyCat '
269 'to be returned.' % repr(cr.make_key(query)),
270 DeprecationWarning, stacklevel=3)
271
272 rlen = len(self)
273 if sort_index is None:
274 sequence, slen = self._limit_sequence(self.data.items(), rlen,
275 b_start, b_size)
276 result = LazyMap(self.instantiate, sequence, slen,
277 actual_result_count=rlen)
278 else:
279 cr.start_split('sort_on')
280 result = self.sortResults(
281 self.data, sort_index, reverse, limit, merge,
282 actual_result_count=rlen, b_start=b_start,
283 b_size=b_size)
284 cr.stop_split('sort_on', None)
285 elif rs:
286 # We got some results from the indexes.
287 # Sort and convert to sequences.
288 # XXX: The check for 'values' is really stupid since we call
289 # items() and *not* values()
290 rlen = len(rs)
291 if sort_index is None and hasattr(rs, 'items'):
292 # having a 'items' means we have a data structure with
293 # scores. Build a new result set, sort it by score, reverse
294 # it, compute the normalized score, and Lazify it.
295
296 if not merge:
297 # Don't bother to sort here, return a list of
298 # three tuples to be passed later to mergeResults
299 # note that data_record_normalized_score_ cannot be
300 # calculated and will always be 1 in this case
301 getitem = self.__getitem__
302 result = [(score, (1, score, rid), getitem)
303 for rid, score in rs.items()]
304 else:
305 cr.start_split('sort_on')
306
307 rs = rs.byValue(0) # sort it by score
308 max = float(rs[0][0])
309
310 # Here we define our getter function inline so that
311 # we can conveniently store the max value as a default arg
312 # and make the normalized score computation lazy
313 def getScoredResult(item, max=max, self=self):
314 """
315 Returns instances of self._v_brains, or whatever is
316 passed into self.useBrains.
317 """
318 score, key = item
319 r=self._v_result_class(self.data[key])\
320 .__of__(aq_parent(self))
321 r.data_record_id_ = key
322 r.data_record_score_ = score
323 r.data_record_normalized_score_ = int(100. * score / max)
324 return r
325
326 sequence, slen = self._limit_sequence(rs, rlen, b_start,
327 b_size)
328 result = LazyMap(getScoredResult, sequence, slen,
329 actual_result_count=rlen)
330 cr.stop_split('sort_on', None)
331
332 elif sort_index is None and not hasattr(rs, 'values'):
333 # no scores
334 if hasattr(rs, 'keys'):
335 rs = rs.keys()
336 sequence, slen = self._limit_sequence(rs, rlen, b_start,
337 b_size)
338 result = LazyMap(self.__getitem__, sequence, slen,
339 actual_result_count=rlen)
340 else:
341 # sort. If there are scores, then this block is not
342 # reached, therefore 'sort-on' does not happen in the
343 # context of a text index query. This should probably
344 # sort by relevance first, then the 'sort-on' attribute.
345 cr.start_split('sort_on')
346 result = self.sortResults(rs, sort_index, reverse, limit,
347 merge, actual_result_count=rlen, b_start=b_start,
348 b_size=b_size)
349 cr.stop_split('sort_on', None)
350 else:
351 # Empty result set
352 result = LazyCat([])
353 cr.stop()
354 return result