d353d6d2b7473b5c557acfb71dde0af4856f76a3
[Plinn.git] / catalog.py
1 # -*- coding: utf-8 -*-
2 from App.class_init import InitializeClass
3 from AccessControl import ClassSecurityInfo
4 from Products.CMFCore.interfaces import IIndexableObject
5 from Products.CMFCore.CatalogTool import CatalogTool as BaseCatalogTool
6 from Products.CMFCore.CatalogTool import IndexableObjectWrapper
7 from Products.PageTemplates.PageTemplateFile import PageTemplateFile
8 from Products.CMFCore.permissions import ModifyPortalContent
9 from zope.component import queryMultiAdapter
10 from Products.ZCatalog.Catalog import Catalog
11 import transaction
12 from solr import *
13
14 # imports for Catalog class
15 from Products.PluginIndexes.interfaces import ILimitedResultIndex
16 from Products.ZCatalog.Lazy import LazyMap, LazyCat, LazyValues
17 from BTrees.IIBTree import intersection, IISet
18 from BTrees.IIBTree import weightedIntersection
19 import warnings
20
21 class SolrTransactionHook :
22 ''' commit solr couplé sur le commit de la ZODB '''
23 def __init__(self, connection) :
24 self.connection = connection
25
26 def __call__(self, status) :
27 if status :
28 self.connection.commit()
29 self.connection.close()
30 else :
31 self.connection.close()
32
33 class CatalogTool(BaseCatalogTool) :
34 meta_type = 'Legivoc Catalog'
35 security = ClassSecurityInfo()
36 manage_options = (BaseCatalogTool.manage_options[:5] +
37 ({'label' : 'Solr', 'action' : 'manage_solr'},) +
38 BaseCatalogTool.manage_options[5:])
39 manage_solr = PageTemplateFile('www/manage_solr', globals())
40
41
42 def __init__(self, idxs=[]) :
43 super(CatalogTool, self).__init__()
44 self._catalog = DelegatedCatalog(self)
45 self.solr_url = 'http://localhost:8983/solr'
46 self.delegatedIndexes = ('Title', 'Description', 'SearchableText')
47
48 security.declarePrivate('solrAdd')
49 def solrAdd(self, object, idxs=[], uid=None) :
50 if IIndexableObject.providedBy(object):
51 w = object
52 else:
53 w = queryMultiAdapter( (object, self), IIndexableObject )
54 if w is None:
55 # BBB
56 w = IndexableObjectWrapper(object, self)
57
58 uid = uid if uid else self.__url(object)
59 idxs = idxs if idxs !=[] else self.delegatedIndexes
60 data = {'id' : uid}
61 for name in idxs :
62 attr = getattr(w, name, '')
63 data[name] = attr() if callable(attr) else attr
64 c = SolrConnection(self.solr_url)
65 c.add(**data)
66 txn = transaction.get()
67 txn.addAfterCommitHook(SolrTransactionHook(c))
68
69
70 # PortalCatalog api overloads
71 security.declareProtected(ModifyPortalContent, 'indexObject')
72 def indexObject(self, object) :
73 """ Add to catalog and send to Solr """
74 super(CatalogTool, self).indexObject(object)
75 self.solrAdd(object)
76
77 security.declarePrivate('reindexObject')
78 def reindexObject(self, object, idxs=[], update_metadata=1, uid=None):
79 super(CatalogTool, self).reindexObject(object,
80 idxs=idxs,
81 update_metadata=update_metadata,
82 uid=uid)
83 if idxs != []:
84 # Filter out invalid indexes.
85 valid_indexes = self._catalog.indexes.keys()
86 idxs = [i for i in idxs if i in valid_indexes and i in self.delegatedIndexes]
87 else :
88 idxs = self.delegatedIndexes
89
90 if idxs :
91 self.solrAdd(object, idxs=idxs, uid=uid)
92
93 security.declarePrivate('unindexObject')
94 def unindexObject(self, object):
95 """Remove from catalog.
96 """
97 super(CatalogTool, self).unindexObject(object)
98 c = SolrConnection(self.solr_url)
99 url = self.__url(object)
100 c.delete(id=url)
101 txn = transaction.get()
102 txn.addAfterCommitHook(SolrTransactionHook(c))
103
104 InitializeClass(CatalogTool)
105
106
107 class DelegatedCatalog(Catalog) :
108 '''C'est ici qu'on délègue effectivement à Solr '''
109
110 def __init__(self, zcat, brains=None) :
111 Catalog.__init__(self, brains=brains)
112 self.zcat = zcat
113
114 def delegateSearch(self, query, plan) :
115 '''
116 retours faux :
117 None signifie : pas de délégation, il faut continuer à interroger les autres index.
118 IISet() vide : pas de résultat lors de la délégation, on peut arrêter la recherche.
119 '''
120 indexes = set(plan).intersection(set(self.zcat.delegatedIndexes))
121 if not indexes :
122 return None
123 delegatedQuery = {}
124 for i in indexes :
125 delegatedQuery[i] = query.pop(i)
126 plan.remove(i)
127 c = SolrConnection(self.zcat.solr_url)
128 q =' AND '.join(['%s:"%s"' % item for item in delegatedQuery.items()])
129 resp = c.query(q, fields='id', rows=len(self))
130 c.close()
131 return IISet(filter(None, [self.uids.get(r['id']) for r in resp.results]))
132
133 def search(self, query, sort_index=None, reverse=0, limit=None, merge=1):
134 """Iterate through the indexes, applying the query to each one. If
135 merge is true then return a lazy result set (sorted if appropriate)
136 otherwise return the raw (possibly scored) results for later merging.
137 Limit is used in conjuntion with sorting or scored results to inform
138 the catalog how many results you are really interested in. The catalog
139 can then use optimizations to save time and memory. The number of
140 results is not guaranteed to fall within the limit however, you should
141 still slice or batch the results as usual."""
142
143 rs = None # resultset
144
145 # Indexes fulfill a fairly large contract here. We hand each
146 # index the query mapping we are given (which may be composed
147 # of some combination of web request, kw mappings or plain old dicts)
148 # and the index decides what to do with it. If the index finds work
149 # for itself in the query, it returns the results and a tuple of
150 # the attributes that were used. If the index finds nothing for it
151 # to do then it returns None.
152
153 # Canonicalize the request into a sensible query before passing it on
154 query = self.make_query(query)
155
156 cr = self.getCatalogPlan(query)
157 cr.start()
158
159 plan = cr.plan()
160 if not plan:
161 plan = self._sorted_search_indexes(query)
162
163 # délégation
164 rs = self.delegateSearch(query, plan)
165 if rs is not None and not rs :
166 return LazyCat([])
167
168 indexes = self.indexes.keys()
169 for i in plan:
170 if i not in indexes:
171 # We can have bogus keys or the plan can contain index names
172 # that have been removed in the meantime
173 continue
174
175 index = self.getIndex(i)
176 _apply_index = getattr(index, "_apply_index", None)
177 if _apply_index is None:
178 continue
179
180 cr.start_split(i)
181 limit_result = ILimitedResultIndex.providedBy(index)
182 if limit_result:
183 r = _apply_index(query, rs)
184 else:
185 r = _apply_index(query)
186
187 if r is not None:
188 r, u = r
189 # Short circuit if empty result
190 # BBB: We can remove the "r is not None" check in Zope 2.14
191 # once we don't need to support the "return everything" case
192 # anymore
193 if r is not None and not r:
194 cr.stop_split(i, result=None, limit=limit_result)
195 return LazyCat([])
196
197 # provide detailed info about the pure intersection time
198 intersect_id = i + '#intersection'
199 cr.start_split(intersect_id)
200 # weightedIntersection preserves the values from any mappings
201 # we get, as some indexes don't return simple sets
202 if hasattr(rs, 'items') or hasattr(r, 'items'):
203 _, rs = weightedIntersection(rs, r)
204 else:
205 rs = intersection(rs, r)
206
207 cr.stop_split(intersect_id)
208
209 # consider the time it takes to intersect the index result with
210 # the total resultset to be part of the index time
211 cr.stop_split(i, result=r, limit=limit_result)
212 if not rs:
213 break
214 else:
215 cr.stop_split(i, result=None, limit=limit_result)
216
217 # Try to deduce the sort limit from batching arguments
218 b_start = int(query.get('b_start', 0))
219 b_size = query.get('b_size', None)
220 if b_size is not None:
221 b_size = int(b_size)
222
223 if b_size is not None:
224 limit = b_start + b_size
225 elif limit and b_size is None:
226 b_size = limit
227
228 if rs is None:
229 # None of the indexes found anything to do with the query
230 # We take this to mean that the query was empty (an empty filter)
231 # and so we return everything in the catalog
232 warnings.warn('Your query %s produced no query restriction. '
233 'Currently the entire catalog content is returned. '
234 'In Zope 2.14 this will result in an empty LazyCat '
235 'to be returned.' % repr(cr.make_key(query)),
236 DeprecationWarning, stacklevel=3)
237
238 rlen = len(self)
239 if sort_index is None:
240 sequence, slen = self._limit_sequence(self.data.items(), rlen,
241 b_start, b_size)
242 result = LazyMap(self.instantiate, sequence, slen,
243 actual_result_count=rlen)
244 else:
245 cr.start_split('sort_on')
246 result = self.sortResults(
247 self.data, sort_index, reverse, limit, merge,
248 actual_result_count=rlen, b_start=b_start,
249 b_size=b_size)
250 cr.stop_split('sort_on', None)
251 elif rs:
252 # We got some results from the indexes.
253 # Sort and convert to sequences.
254 # XXX: The check for 'values' is really stupid since we call
255 # items() and *not* values()
256 rlen = len(rs)
257 if sort_index is None and hasattr(rs, 'items'):
258 # having a 'items' means we have a data structure with
259 # scores. Build a new result set, sort it by score, reverse
260 # it, compute the normalized score, and Lazify it.
261
262 if not merge:
263 # Don't bother to sort here, return a list of
264 # three tuples to be passed later to mergeResults
265 # note that data_record_normalized_score_ cannot be
266 # calculated and will always be 1 in this case
267 getitem = self.__getitem__
268 result = [(score, (1, score, rid), getitem)
269 for rid, score in rs.items()]
270 else:
271 cr.start_split('sort_on')
272
273 rs = rs.byValue(0) # sort it by score
274 max = float(rs[0][0])
275
276 # Here we define our getter function inline so that
277 # we can conveniently store the max value as a default arg
278 # and make the normalized score computation lazy
279 def getScoredResult(item, max=max, self=self):
280 """
281 Returns instances of self._v_brains, or whatever is
282 passed into self.useBrains.
283 """
284 score, key = item
285 r=self._v_result_class(self.data[key])\
286 .__of__(aq_parent(self))
287 r.data_record_id_ = key
288 r.data_record_score_ = score
289 r.data_record_normalized_score_ = int(100. * score / max)
290 return r
291
292 sequence, slen = self._limit_sequence(rs, rlen, b_start,
293 b_size)
294 result = LazyMap(getScoredResult, sequence, slen,
295 actual_result_count=rlen)
296 cr.stop_split('sort_on', None)
297
298 elif sort_index is None and not hasattr(rs, 'values'):
299 # no scores
300 if hasattr(rs, 'keys'):
301 rs = rs.keys()
302 sequence, slen = self._limit_sequence(rs, rlen, b_start,
303 b_size)
304 result = LazyMap(self.__getitem__, sequence, slen,
305 actual_result_count=rlen)
306 else:
307 # sort. If there are scores, then this block is not
308 # reached, therefore 'sort-on' does not happen in the
309 # context of a text index query. This should probably
310 # sort by relevance first, then the 'sort-on' attribute.
311 cr.start_split('sort_on')
312 result = self.sortResults(rs, sort_index, reverse, limit,
313 merge, actual_result_count=rlen, b_start=b_start,
314 b_size=b_size)
315 cr.stop_split('sort_on', None)
316 else:
317 # Empty result set
318 result = LazyCat([])
319 cr.stop()
320 return result