[sup-talk] [PATCH 1/2] move all GDBM data into Xapian

Rich Lane rlane at club.cc.cmu.edu
Sun Aug 16 15:35:30 EDT 2009


Keeping everything in Xapian means much better consistency in case of a crash.
Thread killing is now supported.
---
 lib/sup/xapian_index.rb |  177 +++++++++++++++++++++++++++++------------------
 1 files changed, 110 insertions(+), 67 deletions(-)

diff --git a/lib/sup/xapian_index.rb b/lib/sup/xapian_index.rb
index 861c2a3..825e7a3 100644
--- a/lib/sup/xapian_index.rb
+++ b/lib/sup/xapian_index.rb
@@ -1,5 +1,4 @@
 require 'xapian'
-require 'gdbm'
 require 'set'
 
 module Redwood
@@ -23,12 +22,6 @@ class XapianIndex < BaseIndex
   end
 
   def load_index
-    @entries = MarshalledGDBM.new File.join(@dir, "entries.db")
-    @docids = MarshalledGDBM.new File.join(@dir, "docids.db")
-    @thread_members = MarshalledGDBM.new File.join(@dir, "thread_members.db")
-    @thread_ids = MarshalledGDBM.new File.join(@dir, "thread_ids.db")
-    @assigned_docids = GDBM.new File.join(@dir, "assigned_docids.db")
-
     @xapian = Xapian::WritableDatabase.new(File.join(@dir, "xapian"), Xapian::DB_CREATE_OR_OPEN)
     @term_generator = Xapian::TermGenerator.new()
     @term_generator.stemmer = Xapian::Stem.new(STEM_LANGUAGE)
@@ -48,19 +41,19 @@ class XapianIndex < BaseIndex
   end
 
   def contains_id? id
-    synchronize { @entries.member? id }
+    synchronize { find_docid(id) && true }
   end
 
   def source_for_id id
-    synchronize { @entries[id][:source_id] }
+    synchronize { get_entry(id)[:source_id] }
   end
 
   def delete id
-    synchronize { @xapian.delete_document @docids[id] }
+    synchronize { @xapian.delete_document mkterm(:msgid, id) }
   end
 
   def build_message id
-    entry = synchronize { @entries[id] }
+    entry = synchronize { get_entry id }
     return unless entry
 
     source = SourceManager[entry[:source_id]]
@@ -88,7 +81,7 @@ class XapianIndex < BaseIndex
   end
 
   def sync_message m, opts={}
-    entry = synchronize { @entries[m.id] }
+    entry = synchronize { get_entry m.id }
     snippet = m.snippet
     entry ||= {}
     labels = m.labels
@@ -113,9 +106,7 @@ class XapianIndex < BaseIndex
     m.labels.each { |l| LabelManager << l }
 
     synchronize do
-      index_message m, opts
-      union_threads([m.id] + m.refs + m.replytos)
-      @entries[m.id] = d
+      index_message m, d, opts
     end
     true
   end
@@ -147,8 +138,26 @@ class XapianIndex < BaseIndex
   def each_message_in_thread_for m, opts={}
     # TODO thread by subject
     # TODO handle killed threads
-    ids = synchronize { @thread_members[@thread_ids[m.id]] } || []
-    ids.select { |id| contains_id? id }.each { |id| yield id, lambda { build_message id } }
+    return unless doc = find_doc(m.id)
+    queue = doc.value(THREAD_VALUENO).split(',')
+    msgids = [m.id]
+    seen_threads = Set.new
+    seen_messages = Set.new [m.id]
+    while not queue.empty?
+      thread_id = queue.pop
+      next if seen_threads.member? thread_id
+      return false if thread_killed? thread_id 
+      seen_threads << thread_id
+      docs = term_docids(mkterm(:thread, thread_id)).map { |x| @xapian.document x }
+      docs.each do |doc|
+        msgid = doc.value MSGID_VALUENO
+        next if seen_messages.member? msgid
+        msgids << msgid
+        seen_messages << msgid
+        queue.concat doc.value(THREAD_VALUENO).split(',')
+      end
+    end
+    msgids.each { |id| yield id, lambda { build_message id } }
     true
   end
 
@@ -297,11 +306,16 @@ class XapianIndex < BaseIndex
     'label' => 'L',
     'source_id' => 'I',
     'attachment_extension' => 'O',
+    'msgid' => 'Q',
+    'thread' => 'H',
+    'ref' => 'R',
   }
 
   PREFIX = NORMAL_PREFIX.merge BOOLEAN_PREFIX
 
-  DATE_VALUENO = 0
+  MSGID_VALUENO = 0
+  THREAD_VALUENO = 1
+  DATE_VALUENO = 2
 
   MAX_TERM_LENGTH = 245
 
@@ -317,14 +331,48 @@ class XapianIndex < BaseIndex
   def assign_docid m, truncated_date
     t = (truncated_date.to_i - MIDDLE_DATE.to_i).to_f
     docid = (DOCID_SCALE - DOCID_SCALE/(Math::E**(-(t/TIME_SCALE)) + 1)).to_i
+    while docid > 0 and docid_exists? docid
+      docid -= 1
+    end
+    docid > 0 ? docid : nil
+  end
+
+  # XXX is there a better way?
+  def docid_exists? docid
     begin
-      while @assigned_docids.member? [docid].pack("N")
-        docid -= 1
-      end
-    rescue
+      @xapian.doclength docid
+      true
+    rescue RuntimeError #Xapian::DocNotFoundError
+      raise unless $!.message =~ /DocNotFoundError/
+      false
     end
-    @assigned_docids[[docid].pack("N")] = ''
-    docid
+  end
+
+  def term_docids term
+    @xapian.postlist(term).map { |x| x.docid }
+  end
+
+  def find_docid id
+    term_docids(mkterm(:msgid,id)).tap { |x| fail unless x.size <= 1 }.first
+  end
+
+  def find_doc id
+    return unless docid = find_docid(id)
+    @xapian.document docid
+  end
+
+  def get_id docid
+    return unless doc = @xapian.document(docid)
+    doc.value MSGID_VALUENO
+  end
+
+  def get_entry id
+    return unless doc = find_doc(id)
+    Marshal.load doc.data
+  end
+
+  def thread_killed? thread_id
+    not run_query(Q.new(Q::OP_AND, mkterm(:thread, thread_id), mkterm(:label, :Killed)), 0, 1).empty?
   end
 
   def synchronize &b
@@ -340,7 +388,7 @@ class XapianIndex < BaseIndex
 
   def run_query_ids xapian_query, offset, limit
     matchset = run_query xapian_query, offset, limit
-    matchset.matches.map { |r| r.document.data }
+    matchset.matches.map { |r| r.document.value MSGID_VALUENO }
   end
 
   Q = Xapian::Query
@@ -371,7 +419,7 @@ class XapianIndex < BaseIndex
     end
   end
 
-  def index_message m, opts
+  def index_message m, entry, opts
     terms = []
     text = []
 
@@ -394,6 +442,7 @@ class XapianIndex < BaseIndex
     terms << mkterm(:date,m.date) if m.date
     m.labels.each { |t| terms << mkterm(:label,t) }
     terms << mkterm(:type, 'mail')
+    terms << mkterm(:msgid, m.id)
     terms << mkterm(:source_id, m.source.id)
     m.attachments.each do |a|
       a =~ /\.(\w+)$/ or next
@@ -401,6 +450,23 @@ class XapianIndex < BaseIndex
       terms << t
     end
 
+    ## Thread membership
+    children = term_docids(mkterm(:ref, m.id)).map { |docid| @xapian.document docid }
+    parent_ids = m.refs + m.replytos
+    parents = parent_ids.map { |id| find_doc id }.compact
+    thread_members = SavingHash.new { [] }
+    (children + parents).each do |doc2|
+      thread_ids = doc2.value(THREAD_VALUENO).split ','
+      thread_ids.each { |thread_id| thread_members[thread_id] << doc2 }
+    end
+
+    thread_ids = thread_members.empty? ? [m.id] : thread_members.keys
+
+    thread_ids.each { |thread_id| terms << mkterm(:thread, thread_id) }
+    parent_ids.each do |ref|
+      terms << mkterm(:ref, ref)
+    end
+
     # Full text search content
     text << [subject_text, PREFIX['subject']]
     text << [subject_text, PREFIX['body']]
@@ -424,17 +490,29 @@ class XapianIndex < BaseIndex
       Xapian.sortable_serialise 0
     end
 
-    doc = Xapian::Document.new
-    docid = @docids[m.id] || assign_docid(m, truncated_date)
+    docid = nil
+    unless doc = find_doc(m.id)
+      doc = Xapian::Document.new
+      if not docid = assign_docid(m, truncated_date)
+        # Could be triggered by spam
+        Redwood::log "warning: docid underflow, dropping #{m.id.inspect}"
+        return
+      end
+    else
+      doc.clear_terms
+      doc.clear_values
+      docid = doc.docid
+    end
 
     @term_generator.document = doc
     text.each { |text,prefix| @term_generator.index_text text, 1, prefix }
     terms.each { |term| doc.add_term term if term.length <= MAX_TERM_LENGTH }
+    doc.add_value MSGID_VALUENO, m.id
+    doc.add_value THREAD_VALUENO, (thread_ids * ',')
     doc.add_value DATE_VALUENO, date_value
-    doc.data = m.id
+    doc.data = Marshal.dump entry
 
     @xapian.replace_document docid, doc
-    @docids[m.id] = docid
   end
 
   # Construct a Xapian term
@@ -457,48 +535,13 @@ class XapianIndex < BaseIndex
       PREFIX['source_id'] + args[0].to_s.downcase
     when :attachment_extension
       PREFIX['attachment_extension'] + args[0].to_s.downcase
+    when :msgid, :ref, :thread
+      PREFIX[type.to_s] + args[0][0...(MAX_TERM_LENGTH-1)]
     else
       raise "Invalid term type #{type}"
     end
   end
 
-  # Join all the given message-ids into a single thread
-  def union_threads ids
-    seen_threads = Set.new
-    related = Set.new
-
-    # Get all the ids that will be in the new thread
-    ids.each do |id|
-      related << id
-      thread_id = @thread_ids[id]
-      if thread_id && !seen_threads.member?(thread_id)
-        thread_members = @thread_members[thread_id]
-        related.merge thread_members
-        seen_threads << thread_id
-      end
-    end
-
-    # Pick a leader and move all the others to its thread
-    a = related.to_a
-    best, *rest = a.sort_by { |x| x.hash }
-    @thread_members[best] = a
-    @thread_ids[best] = best
-    rest.each do |x|
-      @thread_members.delete x
-      @thread_ids[x] = best
-    end
-  end
 end
 
 end
-
-class MarshalledGDBM < GDBM
-  def []= k, v
-    super k, Marshal.dump(v)
-  end
-
-  def [] k
-    v = super k
-    v ? Marshal.load(v) : nil
-  end
-end
-- 
1.6.4



More information about the sup-talk mailing list